summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.pre-commit-config.yaml2
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py69
-rw-r--r--oslo_messaging/rpc/__init__.py1
-rw-r--r--oslo_messaging/rpc/client.py39
-rw-r--r--oslo_messaging/tests/functional/utils.py6
-rw-r--r--oslo_messaging/tests/rpc/test_client.py40
-rw-r--r--oslo_messaging/tests/rpc/test_server.py4
-rw-r--r--oslo_messaging/tests/test_transport.py2
-rw-r--r--releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml5
-rw-r--r--releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml9
-rw-r--r--releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml11
-rw-r--r--releasenotes/source/index.rst1
-rw-r--r--releasenotes/source/zed.rst6
-rw-r--r--test-requirements.txt2
-rwxr-xr-xtools/simulator.py2
15 files changed, 162 insertions, 37 deletions
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 08aef91..50d8dea 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -32,7 +32,7 @@ repos:
- id: flake8
name: flake8
additional_dependencies:
- - hacking>=3.0.1,<3.1.0
+ - hacking>=3.0.1,<=4.1.0
language: python
entry: flake8
files: '^.*\.py$'
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index ed2642c..c61393c 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -92,7 +92,7 @@ rabbit_opts = [
'executable used does not support OpenSSL FIPS mode, '
'an exception will be raised.'),
cfg.BoolOpt('heartbeat_in_pthread',
- default=True,
+ default=False,
help="Run the health check heartbeat thread "
"through a native python thread by default. If this "
"option is equal to False then the health check "
@@ -100,7 +100,9 @@ rabbit_opts = [
"from the parent process. For "
"example if the parent process has monkey patched the "
"stdlib by using eventlet/greenlet then the heartbeat "
- "will be run through a green thread.",
+ "will be run through a green thread. "
+ "This option should be set to True only for the "
+ "wsgi services.",
),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
@@ -347,11 +349,44 @@ class Consumer(object):
self._declared_on = None
self.exchange = kombu.entity.Exchange(
name=exchange_name,
- type=type,
+ type=self.type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
self.enable_cancel_on_failover = enable_cancel_on_failover
+ def _declare_fallback(self, err, conn, consumer_arguments):
+ """Fallback by declaring a non durable queue.
+
+ When a control exchange is shared between services it is possible
+ that some service created first a non durable control exchange and
+ then after that an other service can try to create the same control
+ exchange but as a durable control exchange. In this case RabbitMQ
+ will raise an exception (PreconditionFailed), and then it will stop
+ our execution and our service will fail entirly. In this case we want
+ to fallback by creating a non durable queue to match the default
+ config.
+ """
+ if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err):
+ LOG.info(
+ "[%s] Retrying to declare the exchange (%s) as "
+ "non durable", conn.connection_id, self.exchange_name)
+ self.exchange = kombu.entity.Exchange(
+ name=self.exchange_name,
+ type=self.type,
+ durable=False,
+ auto_delete=self.queue_auto_delete)
+ self.queue = kombu.entity.Queue(
+ name=self.queue_name,
+ channel=conn.channel,
+ exchange=self.exchange,
+ durable=False,
+ auto_delete=self.queue_auto_delete,
+ routing_key=self.routing_key,
+ queue_arguments=self.queue_arguments,
+ consumer_arguments=consumer_arguments
+ )
+ self.queue.declare()
+
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
@@ -374,7 +409,18 @@ class Consumer(object):
try:
LOG.debug('[%s] Queue.declare: %s',
conn.connection_id, self.queue_name)
- self.queue.declare()
+ try:
+ self.queue.declare()
+ except amqp_exec.PreconditionFailed as err:
+ # NOTE(hberaud): This kind of exception may be triggered
+ # when a control exchange is shared between services and
+ # when services try to create it with configs that differ
+ # from each others. RabbitMQ will reject the services
+ # that try to create it with a configuration that differ
+ # from the one used first.
+ LOG.warning(err)
+ self._declare_fallback(err, conn, consumer_arguments)
+
except conn.connection.channel_errors as exc:
# NOTE(jrosenboom): This exception may be triggered by a race
# condition. Simply retrying will solve the error most of the time
@@ -1352,7 +1398,20 @@ class Connection(object):
"""Publish a message."""
if not (exchange.passive or exchange.name in self._declared_exchanges):
- exchange(self.channel).declare()
+ try:
+ exchange(self.channel).declare()
+ except amqp_exec.PreconditionFailed as err:
+ # NOTE(hberaud): This kind of exception may be triggered
+ # when a control exchange is shared between services and
+ # when services try to create it with configs that differ
+ # from each others. RabbitMQ will reject the services
+ # that try to create it with a configuration that differ
+ # from the one used first.
+ if "PRECONDITION_FAILED - inequivalent arg 'durable'" \
+ in str(err):
+ LOG.warning("Force creating a non durable exchange.")
+ exchange.durable = False
+ exchange(self.channel).declare()
self._declared_exchanges.add(exchange.name)
log_info = {'msg': msg,
diff --git a/oslo_messaging/rpc/__init__.py b/oslo_messaging/rpc/__init__.py
index 9a320a8..135428e 100644
--- a/oslo_messaging/rpc/__init__.py
+++ b/oslo_messaging/rpc/__init__.py
@@ -30,6 +30,7 @@ __all__ = [
'expected_exceptions',
'get_rpc_transport',
'get_rpc_server',
+ 'get_rpc_client',
'expose'
]
diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py
index cbec525..8e997e9 100644
--- a/oslo_messaging/rpc/client.py
+++ b/oslo_messaging/rpc/client.py
@@ -32,6 +32,7 @@ __all__ = [
'RPCClient',
'RPCVersionCapError',
'RemoteError',
+ 'get_rpc_client',
]
LOG = logging.getLogger(__name__)
@@ -263,6 +264,9 @@ class RPCClient(_BaseCallContext):
The RPCClient class is responsible for sending method invocations to and
receiving return values from remote RPC servers via a messaging transport.
+ The class should always be instantiated by using the get_rpc_client
+ function and not constructing the class directly.
+
Two RPC patterns are supported: RPC calls and RPC casts.
An RPC cast is used when an RPC method does *not* return a value to
@@ -295,7 +299,7 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport):
target = messaging.Target(topic='test', version='2.0')
- self._client = messaging.RPCClient(transport, target)
+ self._client = messaging.get_rpc_client(transport, target)
def test(self, ctxt, arg):
return self._client.call(ctxt, 'test', arg=arg)
@@ -320,7 +324,7 @@ class RPCClient(_BaseCallContext):
transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0')
- client = messaging.RPCClient(transport, target)
+ client = messaging.get_rpc_client(transport, target)
client.call(ctxt, 'test', arg=arg)
but this is probably only useful in limited circumstances as a wrapper
@@ -334,7 +338,7 @@ class RPCClient(_BaseCallContext):
have the RPC request fail with a MessageDeliveryFailure after the given
number of retries. For example::
- client = messaging.RPCClient(transport, target, retry=None)
+ client = messaging.get_rpc_client(transport, target, retry=None)
client.call(ctxt, 'sync')
try:
client.prepare(retry=0).cast(ctxt, 'ping')
@@ -346,9 +350,13 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None,
- call_monitor_timeout=None, transport_options=None):
+ call_monitor_timeout=None, transport_options=None,
+ _manual_load=True):
"""Construct an RPC client.
+ This should not be called directly, use the get_rpc_client function
+ to instantiate this class.
+
:param transport: a messaging transport handle
:type transport: Transport
:param target: the default target for invocations
@@ -371,7 +379,17 @@ class RPCClient(_BaseCallContext):
(less than the overall timeout
parameter).
:type call_monitor_timeout: int
+ :param transport_options: Transport options passed to client.
+ :type transport_options: TransportOptions
+ :param _manual_load: Internal use only to check if class was
+ manually instantiated or not.
+ :type _manual_load: bool
"""
+ if _manual_load:
+ LOG.warning("Using RPCClient manually to instantiate client. "
+ "Please use get_rpc_client to obtain an RPC client "
+ "instance.")
+
if serializer is None:
serializer = msg_serializer.NoOpSerializer()
@@ -530,3 +548,16 @@ class RPCClient(_BaseCallContext):
def can_send_version(self, version=_marker):
"""Check to see if a version is compatible with the version cap."""
return self.prepare(version=version).can_send_version()
+
+
+def get_rpc_client(transport, target, **kwargs):
+ """Construct an RPC client.
+
+ :param transport: the messaging transport
+ :type transport: Transport
+ :param target: the exchange, topic and server to listen on
+ :type target: Target
+ :param **kwargs: The kwargs will be passed down to the
+ RPCClient constructor
+ """
+ return RPCClient(transport, target, _manual_load=False, **kwargs)
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index 0d007b2..6a49f49 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -114,8 +114,8 @@ class RpcServerFixture(fixtures.Fixture):
target=self.target,
endpoints=endpoints,
executor=self.executor)
- self._ctrl = oslo_messaging.RPCClient(transport.transport,
- self.ctrl_target)
+ self._ctrl = oslo_messaging.get_rpc_client(transport.transport,
+ self.ctrl_target)
self._start()
transport.wait()
@@ -230,7 +230,7 @@ class ClientStub(object):
transport_options=None, **kwargs):
self.name = name or "functional-tests"
self.cast = cast
- self.client = oslo_messaging.RPCClient(
+ self.client = oslo_messaging.get_rpc_client(
transport=transport,
target=target,
transport_options=transport_options,
diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py
index af1282a..1358c98 100644
--- a/oslo_messaging/tests/rpc/test_client.py
+++ b/oslo_messaging/tests/rpc/test_client.py
@@ -44,8 +44,9 @@ class TestCastCall(test_utils.BaseTestCase):
self.config(rpc_response_timeout=None)
transport_options = oslo_messaging.TransportOptions()
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
- transport_options=transport_options)
+ client = oslo_messaging.get_rpc_client(
+ transport, oslo_messaging.Target(),
+ transport_options=transport_options)
transport._send = mock.Mock()
@@ -70,7 +71,7 @@ class TestCastCall(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
transport_options = oslo_messaging.TransportOptions(at_least_once=True)
- client = oslo_messaging.RPCClient(
+ client = oslo_messaging.get_rpc_client(
transport,
oslo_messaging.Target(),
transport_options=transport_options)
@@ -215,7 +216,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
expect_target = oslo_messaging.Target(**self.expect)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport, target)
+ client = oslo_messaging.get_rpc_client(transport, target)
transport._send = mock.Mock()
@@ -269,9 +270,9 @@ class TestCallTimeout(test_utils.BaseTestCase):
self.config(rpc_response_timeout=self.confval)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
- timeout=self.ctor,
- call_monitor_timeout=self.cm)
+ client = oslo_messaging.get_rpc_client(
+ transport, oslo_messaging.Target(), timeout=self.ctor,
+ call_monitor_timeout=self.cm)
transport._send = mock.Mock()
@@ -302,8 +303,9 @@ class TestCallRetry(test_utils.BaseTestCase):
def test_call_retry(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
- retry=self.ctor)
+ client = oslo_messaging.get_rpc_client(
+ transport, oslo_messaging.Target(),
+ retry=self.ctor)
transport._send = mock.Mock()
@@ -332,8 +334,8 @@ class TestCallFanout(test_utils.BaseTestCase):
def test_call_fanout(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport,
- oslo_messaging.Target(**self.target))
+ client = oslo_messaging.get_rpc_client(
+ transport, oslo_messaging.Target(**self.target))
if self.prepare is not _notset:
client = client.prepare(**self.prepare)
@@ -363,8 +365,8 @@ class TestSerializer(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
serializer = msg_serializer.NoOpSerializer()
- client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
- serializer=serializer)
+ client = oslo_messaging.get_rpc_client(
+ transport, oslo_messaging.Target(), serializer=serializer)
transport._send = mock.Mock()
kwargs = dict(wait_for_reply=True,
@@ -465,8 +467,8 @@ class TestVersionCap(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version)
- client = oslo_messaging.RPCClient(transport, target,
- version_cap=self.cap)
+ client = oslo_messaging.get_rpc_client(transport, target,
+ version_cap=self.cap)
if self.success:
transport._send = mock.Mock()
@@ -574,8 +576,8 @@ class TestCanSendVersion(test_utils.BaseTestCase):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version)
- client = oslo_messaging.RPCClient(transport, target,
- version_cap=self.cap)
+ client = oslo_messaging.get_rpc_client(transport, target,
+ version_cap=self.cap)
prep_kwargs = {}
if self.prepare_cap is not _notset:
@@ -598,7 +600,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
def test_invalid_version_type(self):
target = oslo_messaging.Target(topic='sometopic')
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport, target)
+ client = oslo_messaging.get_rpc_client(transport, target)
self.assertRaises(exceptions.MessagingException,
client.prepare, version='5')
self.assertRaises(exceptions.MessagingException,
@@ -612,7 +614,7 @@ class TestTransportWarning(test_utils.BaseTestCase):
@mock.patch('oslo_messaging.rpc.client.LOG')
def test_warning_when_notifier_transport(self, log):
transport = oslo_messaging.get_notification_transport(self.conf)
- oslo_messaging.RPCClient(transport, oslo_messaging.Target())
+ oslo_messaging.get_rpc_client(transport, oslo_messaging.Target())
log.warning.assert_called_once_with(
"Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport "
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index 1fc6be8..06cf1c7 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -102,8 +102,8 @@ class ServerSetupMixin(object):
def _setup_client(self, transport, topic='testtopic', exchange=None):
target = oslo_messaging.Target(topic=topic, exchange=exchange)
- return oslo_messaging.RPCClient(transport, target=target,
- serializer=self.serializer)
+ return oslo_messaging.get_rpc_client(transport, target=target,
+ serializer=self.serializer)
class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py
index 31fec16..cb12c16 100644
--- a/oslo_messaging/tests/test_transport.py
+++ b/oslo_messaging/tests/test_transport.py
@@ -115,7 +115,7 @@ class GetTransportTestCase(test_utils.BaseTestCase):
self.assertIsNotNone(transport_)
self.assertIs(transport_.conf, self.conf)
self.assertIs(transport_._driver, drvr)
- self.assertTrue(isinstance(transport_, transport.RPCTransport))
+ self.assertIsInstance(transport_, transport.RPCTransport)
driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
self.expect['backend'],
diff --git a/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml
new file mode 100644
index 0000000..985fc64
--- /dev/null
+++ b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml
@@ -0,0 +1,5 @@
+---
+fixes:
+ - |
+ Force creating non durable control exchange when a precondition failed
+ related to config that differ occuring.
diff --git a/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml
new file mode 100644
index 0000000..265e709
--- /dev/null
+++ b/releasenotes/notes/do-not-run-heartbeat-in-pthread-by-default-42e1299f59b841f8.yaml
@@ -0,0 +1,9 @@
+---
+upgrade:
+ - |
+ The ``[oslo_messaging_rabbit] heartbeat_in_pthread`` config option
+ defaults to ``False`` again.
+ For wsgi applications it is recommended to set this value to ``True``
+ but enabling it for non-wsgi services may break such service.
+ Please check https://bugs.launchpad.net/oslo.messaging/+bug/1934937
+ for more details.
diff --git a/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml b/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml
new file mode 100644
index 0000000..3375cfc
--- /dev/null
+++ b/releasenotes/notes/get-rpc-client-0b4aa62160864b29.yaml
@@ -0,0 +1,11 @@
+---
+features:
+ - |
+ Added new ``get_rpc_client`` function to instantiate the RPCClient
+ class
+deprecations:
+ - |
+ Instantiating the RPCClient class directly is deprecated in favor
+ of using the new ``get_rpc_client`` function to expose a more
+ common API similar to existing functions such as ``get_rpc_server``
+ and ``get_rpc_transport``
diff --git a/releasenotes/source/index.rst b/releasenotes/source/index.rst
index 57b9270..0654d71 100644
--- a/releasenotes/source/index.rst
+++ b/releasenotes/source/index.rst
@@ -6,6 +6,7 @@
:maxdepth: 1
unreleased
+ zed
yoga
xena
wallaby
diff --git a/releasenotes/source/zed.rst b/releasenotes/source/zed.rst
new file mode 100644
index 0000000..9608c05
--- /dev/null
+++ b/releasenotes/source/zed.rst
@@ -0,0 +1,6 @@
+========================
+Zed Series Release Notes
+========================
+
+.. release-notes::
+ :branch: stable/zed
diff --git a/test-requirements.txt b/test-requirements.txt
index 983c1c9..3a7c44f 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -3,7 +3,7 @@
# process, which may cause wedges in the gate later.
# Hacking already pins down pep8, pyflakes and flake8
-hacking>=3.0.1,<3.1.0 # Apache-2.0
+hacking>=3.0.1,<=4.1.0 # Apache-2.0
fixtures>=3.0.0 # Apache-2.0/BSD
stestr>=2.0.0 # Apache-2.0
diff --git a/tools/simulator.py b/tools/simulator.py
index 8b37f50..da9d05e 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -410,7 +410,7 @@ class RPCClient(Client):
def __init__(self, client_id, transport, target, timeout, is_cast,
wait_after_msg, sync_mode=False):
- client = rpc.RPCClient(transport, target)
+ client = rpc.get_rpc_client(transport, target)
method = _rpc_cast if is_cast else _rpc_call
super(RPCClient, self).__init__(client_id,