diff options
32 files changed, 491 insertions, 76 deletions
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 08aef91..50d8dea 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: - id: flake8 name: flake8 additional_dependencies: - - hacking>=3.0.1,<3.1.0 + - hacking>=3.0.1,<=4.1.0 language: python entry: flake8 files: '^.*\.py$' @@ -108,7 +108,7 @@ - check-requirements - lib-forward-testing-python3 - openstack-cover-jobs - - openstack-python3-yoga-jobs + - openstack-python3-zed-jobs - periodic-stable-jobs - publish-openstack-docs-pti - release-notes-jobs-python3 diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst index 142bdf7..b9433a5 100644 --- a/doc/source/admin/rabbit.rst +++ b/doc/source/admin/rabbit.rst @@ -240,6 +240,10 @@ Consuming Options ^^^^^^^^^^^^^^^^^ - :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_delivery_limit` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_length` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_bytes` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl` Connection Options @@ -266,3 +270,4 @@ Security Options - :oslo.config:option:`oslo_messaging_rabbit.ssl_key_file` - :oslo.config:option:`oslo_messaging_rabbit.ssl_cert_file` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_login_method` +- :oslo.config:option:`oslo_messaging_rabbit.ssl_enforce_fips_mode` diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index d4db2c6..b0c9551 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -32,7 +32,9 @@ from oslo_messaging._drivers import common as rpc_common amqp_opts = [ cfg.BoolOpt('amqp_durable_queues', default=False, - help='Use durable queues in AMQP.'), + help='Use durable queues in AMQP. If rabbit_quorum_queue ' + 'is enabled, queues will be durable and this value will ' + 'be ignored.'), cfg.BoolOpt('amqp_auto_delete', default=False, deprecated_group='DEFAULT', diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index bba7228..44a6bb4 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -1245,6 +1245,7 @@ class Controller(pyngus.ConnectionEventHandler): # service. Try to re-establish the connection: if not self._reconnecting: self._reconnecting = True + self.processor.wakeup(lambda: self._hard_reset(reason)) LOG.info("Delaying reconnect attempt for %d seconds", self._delay) self.processor.defer(lambda: self._do_reconnect(reason), @@ -1261,7 +1262,6 @@ class Controller(pyngus.ConnectionEventHandler): """ self._reconnecting = False if not self._closing: - self._hard_reset(reason) host = self.hosts.next() LOG.info("Reconnecting to: %(hostname)s:%(port)s", {'hostname': host.hostname, 'port': host.port}) @@ -1331,4 +1331,5 @@ class Controller(pyngus.ConnectionEventHandler): def _active(self): # Is the connection up return (self._socket_connection and + self._socket_connection.pyngus_conn and self._socket_connection.pyngus_conn.active) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 24fdbc7..991bf46 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -601,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, purpose=rpc_common.PURPOSE_SEND): + def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None): return rpc_common.ConnectionContext(self._connection_pool, - purpose=purpose) + purpose=purpose, + retry=retry) def _get_reply_q(self): with self._reply_q_lock: @@ -649,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver): log_msg = "CAST unique_id: %s " % unique_id try: - with self._get_connection(rpc_common.PURPOSE_SEND) as conn: + with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn: if notify: exchange = self._get_exchange(target) LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'" diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 54c6f7f..b6c3adb 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -392,7 +392,7 @@ class ConnectionContext(Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, purpose): + def __init__(self, connection_pool, purpose, retry): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool @@ -420,7 +420,7 @@ class ConnectionContext(Connection): pooled = purpose == PURPOSE_SEND if pooled: - self.connection = connection_pool.get() + self.connection = connection_pool.get(retry=retry) else: self.connection = connection_pool.create(purpose) self.pooled = pooled diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 0519ca1..c61393c 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import errno import functools @@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool from oslo_messaging import _utils from oslo_messaging import exceptions + +# The QuorumMemConfig will hold the quorum queue memory configurations +QuorumMemConfig = collections.namedtuple('QuorumMemConfig', + 'delivery_limit' + ' max_memory_length' + ' max_memory_bytes') + # NOTE(sileht): don't exist in py2 socket module TCP_USER_TIMEOUT = 18 @@ -74,8 +82,17 @@ rabbit_opts = [ deprecated_name='kombu_ssl_ca_certs', help='SSL certification authority file ' '(valid only if SSL enabled).'), + cfg.BoolOpt('ssl_enforce_fips_mode', + default=False, + help='Global toggle for enforcing the OpenSSL FIPS mode. ' + 'This feature requires Python support. ' + 'This is available in Python 3.9 in all ' + 'environments and may have been backported to older ' + 'Python versions on select environments. If the Python ' + 'executable used does not support OpenSSL FIPS mode, ' + 'an exception will be raised.'), cfg.BoolOpt('heartbeat_in_pthread', - default=True, + default=False, help="Run the health check heartbeat thread " "through a native python thread by default. If this " "option is equal to False then the health check " @@ -83,7 +100,9 @@ rabbit_opts = [ "from the parent process. For " "example if the parent process has monkey patched the " "stdlib by using eventlet/greenlet then the heartbeat " - "will be run through a green thread.", + "will be run through a green thread. " + "This option should be set to True only for the " + "wsgi services.", ), cfg.FloatOpt('kombu_reconnect_delay', default=1.0, @@ -108,7 +127,7 @@ rabbit_opts = [ 'unavailable. Takes effect only if more than one ' 'RabbitMQ node is provided in config.'), cfg.StrOpt('rabbit_login_method', - choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'), + choices=('PLAIN', 'AMQPLAIN', 'EXTERNAL', 'RABBIT-CR-DEMO'), default='AMQPLAIN', deprecated_group='DEFAULT', help='The RabbitMQ login method.'), @@ -136,6 +155,41 @@ rabbit_opts = [ 'nodes, run: ' """\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """ """'{"ha-mode": "all"}' \""""), + cfg.BoolOpt('rabbit_quorum_queue', + default=False, + help='Use quorum queues in RabbitMQ (x-queue-type: quorum). ' + 'The quorum queue is a modern queue type for RabbitMQ ' + 'implementing a durable, replicated FIFO queue based on the ' + 'Raft consensus algorithm. It is available as of ' + 'RabbitMQ 3.8.0. If set this option will conflict with ' + 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, ' + 'in other words the HA queues should be disabled, quorum ' + 'queues durable by default so the amqp_durable_queues ' + 'opion is ignored when this option enabled.'), + cfg.IntOpt('rabbit_quorum_delivery_limit', + default=0, + help='Each time a message is redelivered to a consumer, ' + 'a counter is incremented. Once the redelivery count ' + 'exceeds the delivery limit the message gets dropped ' + 'or dead-lettered (if a DLX exchange has been configured) ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_length', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of messages in the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_bytes', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of memory bytes used by the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, @@ -178,7 +232,9 @@ rabbit_opts = [ LOG = logging.getLogger(__name__) -def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): +def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, + rabbit_quorum_queue, + rabbit_quorum_queue_config): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -201,12 +257,48 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): Setting a queue TTL causes the queue to be automatically deleted if it is unused for the TTL duration. This is a helpful safeguard to prevent queues with zero consumers from growing without bound. + + If the rabbit_quorum_queue option is set, we try to declare a mirrored + queue as described here: + + https://www.rabbitmq.com/quorum-queues.html + + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. If set this option will conflict with + the HA queues (``rabbit_ha_queues``) aka mirrored queues, + in other words HA queues should be disabled. + + rabbit_quorum_queue_config: + Quorum queues provides three options to handle message poisoning + and limit the resources the qourum queue can use + x-delivery-limit number of times the queue will try to deliver + a message before it decide to discard it + x-max-in-memory-length, x-max-in-memory-bytes control the size + of memory used by quorum queue """ args = {} + if rabbit_quorum_queue and rabbit_ha_queues: + raise RuntimeError('Configuration Error: rabbit_quorum_queue ' + 'and rabbit_ha_queues both enabled, queue ' + 'type is quorum or HA (mirrored) not both') + if rabbit_ha_queues: args['x-ha-policy'] = 'all' + if rabbit_quorum_queue: + args['x-queue-type'] = 'quorum' + if rabbit_quorum_queue_config.delivery_limit: + args['x-delivery-limit'] = \ + rabbit_quorum_queue_config.delivery_limit + if rabbit_quorum_queue_config.max_memory_length: + args['x-max-in-memory-length'] = \ + rabbit_quorum_queue_config.max_memory_length + if rabbit_quorum_queue_config.max_memory_bytes: + args['x-max-in-memory-bytes'] = \ + rabbit_quorum_queue_config.max_memory_bytes + if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 @@ -235,7 +327,8 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, - enable_cancel_on_failover=False): + enable_cancel_on_failover=False, rabbit_quorum_queue=False, + rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -248,8 +341,10 @@ class Consumer(object): self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, - rabbit_queue_ttl) + rabbit_quorum_queue_config = rabbit_quorum_queue_config or {} + self.queue_arguments = _get_queue_arguments( + rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, + rabbit_quorum_queue_config) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -496,16 +591,20 @@ class ConnectionLock(DummyConnectionLock): class Connection(object): """Connection object.""" - def __init__(self, conf, url, purpose): + def __init__(self, conf, url, purpose, retry=None): # NOTE(viktors): Parse config options driver_conf = conf.oslo_messaging_rabbit self.interval_start = driver_conf.rabbit_retry_interval self.interval_stepping = driver_conf.rabbit_retry_backoff self.interval_max = driver_conf.rabbit_interval_max + self.max_retries = retry self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue + self.rabbit_quorum_queue_config = self._get_quorum_configurations( + driver_conf) self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -521,6 +620,7 @@ class Connection(object): self.kombu_failover_strategy = driver_conf.kombu_failover_strategy self.kombu_compression = driver_conf.kombu_compression self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread + self.ssl_enforce_fips_mode = driver_conf.ssl_enforce_fips_mode self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover if self.heartbeat_in_pthread: @@ -558,6 +658,19 @@ class Connection(object): self.ssl_cert_file = driver_conf.ssl_cert_file self.ssl_ca_file = driver_conf.ssl_ca_file + if self.ssl_enforce_fips_mode: + if hasattr(ssl, 'FIPS_mode'): + LOG.info("Enforcing the use of the OpenSSL FIPS mode") + ssl.FIPS_mode_set(1) + else: + raise exceptions.ConfigurationError( + "OpenSSL FIPS mode is not supported by your Python " + "version. You must either change the Python " + "executable used to a version with FIPS mode " + "support or disable FIPS mode by setting the " + "'[oslo_messaging_rabbit] ssl_enforce_fips_mode' " + "configuration option to 'False'.") + self._url = '' if url.hosts: if url.transport.startswith('kombu+'): @@ -705,6 +818,12 @@ class Connection(object): except AttributeError: pass + @property + def durable(self): + # Quorum queues are durable by default, durable option should + # be enabled by default with quorum queues + return self.amqp_durable_queues or self.rabbit_quorum_queue + @classmethod def validate_ssl_version(cls, version): key = version.lower() @@ -713,6 +832,14 @@ class Connection(object): except KeyError: raise RuntimeError("Invalid SSL version : %s" % version) + def _get_quorum_configurations(self, driver_conf): + """Get the quorum queue configurations""" + delivery_limit = driver_conf.rabbit_quorum_delivery_limit + max_memory_length = driver_conf.rabbit_quroum_max_memory_length + max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes + return QuorumMemConfig(delivery_limit, max_memory_length, + max_memory_bytes) + # NOTE(moguimar): default_password in this function's context is just # a fallback option, not a hardcoded password. def _transform_transport_url(self, url, host, default_username='', # nosec @@ -772,7 +899,13 @@ class Connection(object): str(exc), interval) self._set_current_channel(None) - self.connection.ensure_connection(errback=on_error) + self.connection.ensure_connection( + errback=on_error, + max_retries=self.max_retries, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + interval_max=self.interval_max, + ) self._set_current_channel(self.connection.channel()) self.set_transport_socket_timeout() @@ -1194,12 +1327,14 @@ class Connection(object): queue_name=queue_name or topic, routing_key=topic, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, exchange_auto_delete=self.amqp_auto_delete, queue_auto_delete=self.amqp_auto_delete, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, - enable_cancel_on_failover=self.enable_cancel_on_failover) + enable_cancel_on_failover=self.enable_cancel_on_failover, + rabbit_quorum_queue=self.rabbit_quorum_queue, + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) self.declare_consumer(consumer) @@ -1324,7 +1459,11 @@ class Connection(object): auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0)) + queue_arguments=_get_queue_arguments( + self.rabbit_ha_queues, + 0, + self.rabbit_quorum_queue, + self.rabbit_quorum_queue_config)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' @@ -1380,7 +1519,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish, exchange, msg, @@ -1402,7 +1541,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self.durable, auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish_and_creates_default_queue, diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index 8090e8d..9e5288d 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta): self._items.append((ttl_watch, item)) self._cond.notify() - def get(self): + def get(self, retry=None): """Return an item from the pool, when one is available. This may cause the calling thread to block. @@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta): # We've grabbed a slot and dropped the lock, now do the creation try: - return self.create() + return self.create(retry=retry) except Exception: with self._cond: self._current_size -= 1 @@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta): return @abc.abstractmethod - def create(self): + def create(self, retry=None): """Construct a new item.""" @@ -130,9 +130,9 @@ class ConnectionPool(Pool): LOG.debug("Idle connection has expired and been closed." " Pool size: %d" % len(self._items)) - def create(self, purpose=common.PURPOSE_SEND): + def create(self, purpose=common.PURPOSE_SEND, retry=None): LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url, purpose) + return self.connection_cls(self.conf, self.url, purpose, retry=retry) def empty(self): for item in self.iter_free(): diff --git a/oslo_messaging/exceptions.py b/oslo_messaging/exceptions.py index 48645b3..391fe46 100644 --- a/oslo_messaging/exceptions.py +++ b/oslo_messaging/exceptions.py @@ -47,3 +47,7 @@ class MessageUndeliverable(Exception): self.exchange = exchange self.routing_key = routing_key self.message = message + + +class ConfigurationError(Exception): + """Raised when messaging isn't configured correctly.""" diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py index 61c7357..da633d8 100644 --- a/oslo_messaging/notify/messaging.py +++ b/oslo_messaging/notify/messaging.py @@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging. The messaging drivers publish notification messages to notification listeners. -The driver will block the notifier's thread until the notification message has -been passed to the messaging transport. There is no guarantee that the -notification message will be consumed by a notification listener. +In case of the rabbit backend the driver will block the notifier's thread +until the notification message has been passed to the messaging transport. +There is no guarantee that the notification message will be consumed by a +notification listener. + +In case of the kafka backend the driver will not block the notifier's thread +but return immediately. The driver will try to deliver the message in the +background. Notification messages are sent 'at-most-once' - ensuring that they are not duplicated. If the connection to the messaging service is not active when a notification is -sent this driver will block waiting for the connection to complete. If the -connection fails to complete, the driver will try to re-establish that +sent the rabbit backend will block waiting for the connection to complete. +If the connection fails to complete, the driver will try to re-establish that connection. By default this will continue indefinitely until the connection completes. However, the retry parameter can be used to have the notification -send fail with a MessageDeliveryFailure after the given number of retries. +send fail. In this case an error is logged and the notifier's thread is resumed +without any error. + +If the connection to the messaging service is not active when a notification is +sent the kafka backend will return immediately and the backend tries to +establish the connection and deliver the messages in the background. + """ import logging diff --git a/oslo_messaging/rpc/__init__.py b/oslo_messaging/rpc/__init__.py index 9a320a8..135428e 100644 --- a/oslo_messaging/rpc/__init__.py +++ b/oslo_messaging/rpc/__init__.py @@ -30,6 +30,7 @@ __all__ = [ 'expected_exceptions', 'get_rpc_transport', 'get_rpc_server', + 'get_rpc_client', 'expose' ] diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index cbec525..8e997e9 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -32,6 +32,7 @@ __all__ = [ 'RPCClient', 'RPCVersionCapError', 'RemoteError', + 'get_rpc_client', ] LOG = logging.getLogger(__name__) @@ -263,6 +264,9 @@ class RPCClient(_BaseCallContext): The RPCClient class is responsible for sending method invocations to and receiving return values from remote RPC servers via a messaging transport. + The class should always be instantiated by using the get_rpc_client + function and not constructing the class directly. + Two RPC patterns are supported: RPC calls and RPC casts. An RPC cast is used when an RPC method does *not* return a value to @@ -295,7 +299,7 @@ class RPCClient(_BaseCallContext): def __init__(self, transport): target = messaging.Target(topic='test', version='2.0') - self._client = messaging.RPCClient(transport, target) + self._client = messaging.get_rpc_client(transport, target) def test(self, ctxt, arg): return self._client.call(ctxt, 'test', arg=arg) @@ -320,7 +324,7 @@ class RPCClient(_BaseCallContext): transport = messaging.get_rpc_transport(cfg.CONF) target = messaging.Target(topic='test', version='2.0') - client = messaging.RPCClient(transport, target) + client = messaging.get_rpc_client(transport, target) client.call(ctxt, 'test', arg=arg) but this is probably only useful in limited circumstances as a wrapper @@ -334,7 +338,7 @@ class RPCClient(_BaseCallContext): have the RPC request fail with a MessageDeliveryFailure after the given number of retries. For example:: - client = messaging.RPCClient(transport, target, retry=None) + client = messaging.get_rpc_client(transport, target, retry=None) client.call(ctxt, 'sync') try: client.prepare(retry=0).cast(ctxt, 'ping') @@ -346,9 +350,13 @@ class RPCClient(_BaseCallContext): def __init__(self, transport, target, timeout=None, version_cap=None, serializer=None, retry=None, - call_monitor_timeout=None, transport_options=None): + call_monitor_timeout=None, transport_options=None, + _manual_load=True): """Construct an RPC client. + This should not be called directly, use the get_rpc_client function + to instantiate this class. + :param transport: a messaging transport handle :type transport: Transport :param target: the default target for invocations @@ -371,7 +379,17 @@ class RPCClient(_BaseCallContext): (less than the overall timeout parameter). :type call_monitor_timeout: int + :param transport_options: Transport options passed to client. + :type transport_options: TransportOptions + :param _manual_load: Internal use only to check if class was + manually instantiated or not. + :type _manual_load: bool """ + if _manual_load: + LOG.warning("Using RPCClient manually to instantiate client. " + "Please use get_rpc_client to obtain an RPC client " + "instance.") + if serializer is None: serializer = msg_serializer.NoOpSerializer() @@ -530,3 +548,16 @@ class RPCClient(_BaseCallContext): def can_send_version(self, version=_marker): """Check to see if a version is compatible with the version cap.""" return self.prepare(version=version).can_send_version() + + +def get_rpc_client(transport, target, **kwargs): + """Construct an RPC client. + + :param transport: the messaging transport + :type transport: Transport + :param target: the exchange, topic and server to listen on + :type target: Target + :param **kwargs: The kwargs will be passed down to the + RPCClient constructor + """ + return RPCClient(transport, target, _manual_load=False, **kwargs) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 8955661..dbbf33c 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -31,6 +31,7 @@ import oslo_messaging from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver +from oslo_messaging.exceptions import ConfigurationError from oslo_messaging.exceptions import MessageDeliveryFailure from oslo_messaging.tests import utils as test_utils from oslo_messaging.transport import DriverLoadFailure @@ -205,6 +206,65 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): ) +class TestRabbitDriverLoadSSLWithFIPS(test_utils.BaseTestCase): + scenarios = [ + ('ssl_fips_mode', dict(options=dict(ssl=True, + ssl_enforce_fips_mode=True), + expected=True)), + ] + + @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection' + '.ensure_connection') + @mock.patch('kombu.connection.Connection') + def test_driver_load_with_fips_supported(self, + connection_klass, fake_ensure): + self.config(ssl=True, ssl_enforce_fips_mode=True, + group="oslo_messaging_rabbit") + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + + with mock.patch.object(ssl, 'FIPS_mode', + create=True, return_value=True): + with mock.patch.object(ssl, 'FIPS_mode_set', create=True): + + connection = transport._driver._get_connection() + connection_klass.assert_called_once_with( + 'memory:///', transport_options={ + 'client_properties': { + 'capabilities': { + 'connection.blocked': True, + 'consumer_cancel_notify': True, + 'authentication_failure_close': True, + }, + 'connection_name': connection.name}, + 'confirm_publish': True, + 'on_blocked': mock.ANY, + 'on_unblocked': mock.ANY}, + ssl=self.expected, login_method='AMQPLAIN', + heartbeat=60, failover_strategy='round-robin' + ) + + @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection' + '.ensure_connection') + @mock.patch('oslo_messaging._drivers.impl_rabbit.ssl') + @mock.patch('kombu.connection.Connection') + def test_fips_unsupported(self, connection_klass, fake_ssl, fake_ensure): + self.config(ssl=True, ssl_enforce_fips_mode=True, + group="oslo_messaging_rabbit") + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + + del fake_ssl.FIPS_mode + + # We do this test only if FIPS mode is not supported to + # ensure that we hard fail. + self.assertRaises( + ConfigurationError, + transport._driver._get_connection) + + class TestRabbitPublisher(test_utils.BaseTestCase): @mock.patch('kombu.messaging.Producer.publish') def test_send_with_timeout(self, fake_publish): @@ -1008,21 +1068,36 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): self.assertRaises(oslo_messaging.MessageDeliveryFailure, self.connection.ensure, mock_callback, retry=4) - self.assertEqual(6, mock_callback.call_count) + # TODO(stephenfin): Remove when we drop support for kombu < 5.2.4 + expected = 5 + if kombu.VERSION < (5, 2, 4): + expected = 6 + self.assertEqual(expected, mock_callback.call_count) def test_ensure_one_retry(self): mock_callback = mock.Mock(side_effect=IOError) self.assertRaises(oslo_messaging.MessageDeliveryFailure, self.connection.ensure, mock_callback, retry=1) - self.assertEqual(3, mock_callback.call_count) + # TODO(stephenfin): Remove when we drop support for kombu < 5.2.4 + expected = 2 + if kombu.VERSION < (5, 2, 4): + expected = 3 + self.assertEqual(expected, mock_callback.call_count) def test_ensure_no_retry(self): mock_callback = mock.Mock(side_effect=IOError) - self.assertRaises(oslo_messaging.MessageDeliveryFailure, - self.connection.ensure, mock_callback, - retry=0) - self.assertEqual(2, mock_callback.call_count) + self.assertRaises( + oslo_messaging.MessageDeliveryFailure, + self.connection.ensure, + mock_callback, + retry=0, + ) + # TODO(stephenfin): Remove when we drop support for kombu < 5.2.4 + expected = 1 + if kombu.VERSION < (5, 2, 4): + expected = 2 + self.assertEqual(expected, mock_callback.call_count) class ConnectionLockTestCase(test_utils.BaseTestCase): diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py index d5c6420..82a10e1 100644 --- a/oslo_messaging/tests/drivers/test_pool.py +++ b/oslo_messaging/tests/drivers/test_pool.py @@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase): class TestPool(pool.Pool): - def create(self): + def create(self, retry=None): return uuid.uuid4() class ThreadWaitWaiter(object): @@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase): p = self.TestPool(**kwargs) if self.create_error: - def create_error(): + def create_error(retry=None): raise RuntimeError orig_create = p.create self.useFixture(fixtures.MockPatchObject( diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 0d007b2..6a49f49 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -114,8 +114,8 @@ class RpcServerFixture(fixtures.Fixture): target=self.target, endpoints=endpoints, executor=self.executor) - self._ctrl = oslo_messaging.RPCClient(transport.transport, - self.ctrl_target) + self._ctrl = oslo_messaging.get_rpc_client(transport.transport, + self.ctrl_target) self._start() transport.wait() @@ -230,7 +230,7 @@ class ClientStub(object): transport_options=None, **kwargs): self.name = name or "functional-tests" self.cast = cast - self.client = oslo_messaging.RPCClient( + self.client = oslo_messaging.get_rpc_client( transport=transport, target=target, transport_options=transport_options, diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index c36a432..330bdab 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -18,6 +18,7 @@ import sys import uuid import fixtures +from kombu import connection from oslo_serialization import jsonutils from oslo_utils import strutils from oslo_utils import timeutils @@ -228,6 +229,80 @@ class TestMessagingNotifier(test_utils.BaseTestCase): TestMessagingNotifier.generate_scenarios() +class TestMessagingNotifierRetry(test_utils.BaseTestCase): + + class TestingException(BaseException): + pass + + def test_notifier_retry_connection_fails_rabbit(self): + """This test sets a small retry number for notification sending and + configures a non reachable message bus. The expectation that after the + configured number of retries the driver gives up the message sending. + """ + self.config( + driver=["messagingv2"], + topics=["test-retry"], + retry=2, + group="oslo_messaging_notifications") + self.config( + # just to speed up the test execution + rabbit_retry_backoff=0, + group="oslo_messaging_rabbit") + transport = oslo_messaging.get_notification_transport( + self.conf, url='rabbit://') + notifier = oslo_messaging.Notifier(transport) + + orig_establish_connection = connection.Connection._establish_connection + calls = [] + + def wrapped_establish_connection(*args, **kwargs): + if len(calls) > 2: + raise self.TestingException( + "Connection should only be retried twice due to " + "configuration") + else: + calls.append((args, kwargs)) + orig_establish_connection(*args, **kwargs) + + with mock.patch( + 'kombu.connection.Connection._establish_connection', + new=wrapped_establish_connection + ): + with mock.patch( + 'oslo_messaging.notify.messaging.LOG.exception' + ) as mock_log: + notifier.info({}, "test", {}) + + # one normal call plus two retries + self.assertEqual(3, len(calls)) + # the error was caught and logged + mock_log.assert_called_once() + + def test_notifier_retry_connection_fails_kafka(self): + """This test sets a small retry number for notification sending and + configures a non reachable message bus. The expectation that after the + configured number of retries the driver gives up the message sending. + """ + + self.config( + driver=["messagingv2"], + topics=["test-retry"], + retry=2, + group='oslo_messaging_notifications') + + transport = oslo_messaging.get_notification_transport( + self.conf, url='kafka://') + + notifier = oslo_messaging.Notifier(transport) + + # Kafka's message producer interface is async, and there is no way + # from the oslo interface to force sending a pending message. So this + # call simply returns without i) failing to deliver the message to + # the non existent kafka bus ii) retrying the message delivery twice + # as the configuration requested it. + notifier.info({}, "test", {}) + + class TestSerializer(test_utils.BaseTestCase): def setUp(self): diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index af1282a..1358c98 100644 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -44,8 +44,9 @@ class TestCastCall(test_utils.BaseTestCase): self.config(rpc_response_timeout=None) transport_options = oslo_messaging.TransportOptions() transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - transport_options=transport_options) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), + transport_options=transport_options) transport._send = mock.Mock() @@ -70,7 +71,7 @@ class TestCastCall(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') transport_options = oslo_messaging.TransportOptions(at_least_once=True) - client = oslo_messaging.RPCClient( + client = oslo_messaging.get_rpc_client( transport, oslo_messaging.Target(), transport_options=transport_options) @@ -215,7 +216,7 @@ class TestCastToTarget(test_utils.BaseTestCase): expect_target = oslo_messaging.Target(**self.expect) transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, target) + client = oslo_messaging.get_rpc_client(transport, target) transport._send = mock.Mock() @@ -269,9 +270,9 @@ class TestCallTimeout(test_utils.BaseTestCase): self.config(rpc_response_timeout=self.confval) transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - timeout=self.ctor, - call_monitor_timeout=self.cm) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), timeout=self.ctor, + call_monitor_timeout=self.cm) transport._send = mock.Mock() @@ -302,8 +303,9 @@ class TestCallRetry(test_utils.BaseTestCase): def test_call_retry(self): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - retry=self.ctor) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), + retry=self.ctor) transport._send = mock.Mock() @@ -332,8 +334,8 @@ class TestCallFanout(test_utils.BaseTestCase): def test_call_fanout(self): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, - oslo_messaging.Target(**self.target)) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(**self.target)) if self.prepare is not _notset: client = client.prepare(**self.prepare) @@ -363,8 +365,8 @@ class TestSerializer(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') serializer = msg_serializer.NoOpSerializer() - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - serializer=serializer) + client = oslo_messaging.get_rpc_client( + transport, oslo_messaging.Target(), serializer=serializer) transport._send = mock.Mock() kwargs = dict(wait_for_reply=True, @@ -465,8 +467,8 @@ class TestVersionCap(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(version=self.version) - client = oslo_messaging.RPCClient(transport, target, - version_cap=self.cap) + client = oslo_messaging.get_rpc_client(transport, target, + version_cap=self.cap) if self.success: transport._send = mock.Mock() @@ -574,8 +576,8 @@ class TestCanSendVersion(test_utils.BaseTestCase): transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') target = oslo_messaging.Target(version=self.version) - client = oslo_messaging.RPCClient(transport, target, - version_cap=self.cap) + client = oslo_messaging.get_rpc_client(transport, target, + version_cap=self.cap) prep_kwargs = {} if self.prepare_cap is not _notset: @@ -598,7 +600,7 @@ class TestCanSendVersion(test_utils.BaseTestCase): def test_invalid_version_type(self): target = oslo_messaging.Target(topic='sometopic') transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, target) + client = oslo_messaging.get_rpc_client(transport, target) self.assertRaises(exceptions.MessagingException, client.prepare, version='5') self.assertRaises(exceptions.MessagingException, @@ -612,7 +614,7 @@ class TestTransportWarning(test_utils.BaseTestCase): @mock.patch('oslo_messaging.rpc.client.LOG') def test_warning_when_notifier_transport(self, log): transport = oslo_messaging.get_notification_transport(self.conf) - oslo_messaging.RPCClient(transport, oslo_messaging.Target()) + oslo_messaging.get_rpc_client(transport, oslo_messaging.Target()) log.warning.assert_called_once_with( "Using notification transport for RPC. Please use " "get_rpc_transport to obtain an RPC transport " diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 1fc6be8..06cf1c7 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -102,8 +102,8 @@ class ServerSetupMixin(object): def _setup_client(self, transport, topic='testtopic', exchange=None): target = oslo_messaging.Target(topic=topic, exchange=exchange) - return oslo_messaging.RPCClient(transport, target=target, - serializer=self.serializer) + return oslo_messaging.get_rpc_client(transport, target=target, + serializer=self.serializer) class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index 31fec16..cb12c16 100644 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -115,7 +115,7 @@ class GetTransportTestCase(test_utils.BaseTestCase): self.assertIsNotNone(transport_) self.assertIs(transport_.conf, self.conf) self.assertIs(transport_._driver, drvr) - self.assertTrue(isinstance(transport_, transport.RPCTransport)) + self.assertIsInstance(transport_, transport.RPCTransport) driver.DriverManager.assert_called_once_with('oslo.messaging.drivers', self.expect['backend'], diff --git a/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml new file mode 100644 index 0000000..42fdfbf --- /dev/null +++ b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Add quorum configuration x-max-in-memory-length, + x-max-in-memory-bytes, x-delivery-limit which control the quorum + queue memory usage and handle the message poisoning problem diff --git a/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml new file mode 100644 index 0000000..a88c5d8 --- /dev/null +++ b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Adding support for quorum queues. Quorum queues are enabled if the + ``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``). + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. The quorum queues are durable by default + (``amqp_durable_queues``) will be ignored. + when enabled the HA queues (``rabbit_ha_queues``) aka mirrored queues + should be disabled since the queue can't be both types at the same time diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml new file mode 100644 index 0000000..d3d62cb --- /dev/null +++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit + backend is changed to use the ``[oslo_messaging_notifications]retry`` + parameter when driver tries to connect to the message bus during + notification sending. Before this fix the rabbit backend retried the + connection forever blocking the caller thread. diff --git a/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml new file mode 100644 index 0000000..265e709 --- /dev/null +++ b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml @@ -0,0 +1,9 @@ +--- +upgrade: + - | + The ``[oslo_messaging_rabbit] heartbeat_in_pthread`` config option + defaults to ``False`` again. + For wsgi applications it is recommended to set this value to ``True`` + but enabling it for non-wsgi services may break such service. + Please check https://bugs.launchpad.net/oslo.messaging/+bug/1934937 + for more details. diff --git a/releasenotes/notes/enforce_fips_mode-07dd259eb8a73c2b.yaml b/releasenotes/notes/enforce_fips_mode-07dd259eb8a73c2b.yaml new file mode 100644 index 0000000..ba6ac7f --- /dev/null +++ b/releasenotes/notes/enforce_fips_mode-07dd259eb8a73c2b.yaml @@ -0,0 +1,10 @@ +--- +features: + - | + Adding a new option, ``[oslo_messaging_rabbit] ssl_enforce_fips_mode``, to + the rabbitmq driver to enforce the OpenSSL FIPS mode if supported by the + version of Python. +security: + - | + We are now able to enforce the OpenSSL FIPS mode by using + ``[oslo_messaging_rabbit] ssl_enforce_fips_mode``. diff --git a/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml b/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml new file mode 100644 index 0000000..3375cfc --- /dev/null +++ b/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Added new ``get_rpc_client`` function to instantiate the RPCClient + class +deprecations: + - | + Instantiating the RPCClient class directly is deprecated in favor + of using the new ``get_rpc_client`` function to expose a more + common API similar to existing functions such as ``get_rpc_server`` + and ``get_rpc_transport`` diff --git a/releasenotes/source/index.rst b/releasenotes/source/index.rst index b69bcf2..0654d71 100644 --- a/releasenotes/source/index.rst +++ b/releasenotes/source/index.rst @@ -6,6 +6,8 @@ :maxdepth: 1 unreleased + zed + yoga xena wallaby victoria diff --git a/releasenotes/source/yoga.rst b/releasenotes/source/yoga.rst new file mode 100644 index 0000000..7cd5e90 --- /dev/null +++ b/releasenotes/source/yoga.rst @@ -0,0 +1,6 @@ +========================= +Yoga Series Release Notes +========================= + +.. release-notes:: + :branch: stable/yoga diff --git a/releasenotes/source/zed.rst b/releasenotes/source/zed.rst new file mode 100644 index 0000000..9608c05 --- /dev/null +++ b/releasenotes/source/zed.rst @@ -0,0 +1,6 @@ +======================== +Zed Series Release Notes +======================== + +.. release-notes:: + :branch: stable/zed @@ -6,7 +6,7 @@ summary = Oslo Messaging API description_file = README.rst home_page = https://docs.openstack.org/oslo.messaging/latest/ -python_requires = >=3.6 +python_requires = >=3.8 classifier = Environment :: OpenStack Intended Audience :: Developers @@ -15,9 +15,8 @@ classifier = Operating System :: OS Independent Programming Language :: Python Programming Language :: Python :: 3 - Programming Language :: Python :: 3.6 - Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 Programming Language :: Python :: 3 :: Only Programming Language :: Python :: Implementation :: CPython diff --git a/test-requirements.txt b/test-requirements.txt index 983c1c9..3a7c44f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,7 +3,7 @@ # process, which may cause wedges in the gate later. # Hacking already pins down pep8, pyflakes and flake8 -hacking>=3.0.1,<3.1.0 # Apache-2.0 +hacking>=3.0.1,<=4.1.0 # Apache-2.0 fixtures>=3.0.0 # Apache-2.0/BSD stestr>=2.0.0 # Apache-2.0 diff --git a/tools/simulator.py b/tools/simulator.py index 8b37f50..da9d05e 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -410,7 +410,7 @@ class RPCClient(Client): def __init__(self, client_id, transport, target, timeout, is_cast, wait_after_msg, sync_mode=False): - client = rpc.RPCClient(transport, target) + client = rpc.get_rpc_client(transport, target) method = _rpc_cast if is_cast else _rpc_call super(RPCClient, self).__init__(client_id, |