summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-07-11 16:45:55 +0000
committerGerrit Code Review <review@openstack.org>2019-07-11 16:45:55 +0000
commitf7eb82a1e478029ca89c95854142b35789c771c6 (patch)
tree58126dfb7baff2e41084f0110d8ffe9a3bafd6d1
parent1c31abc7bc7b1b7dacd8c482857ab8829ed05eaf (diff)
parentc50076b4efb79cef46d618d6d80eecbcebb72898 (diff)
downloadoslo-messaging-f7eb82a1e478029ca89c95854142b35789c771c6.tar.gz
Merge "Implement mandatory flag for RabbitMQ driver"9.8.0
-rw-r--r--lower-constraints.txt2
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py12
-rw-r--r--oslo_messaging/exceptions.py13
-rw-r--r--oslo_messaging/tests/functional/test_functional.py34
-rw-r--r--oslo_messaging/tests/functional/utils.py9
-rw-r--r--requirements.txt2
6 files changed, 66 insertions, 6 deletions
diff --git a/lower-constraints.txt b/lower-constraints.txt
index 3df8f1a..338c12a 100644
--- a/lower-constraints.txt
+++ b/lower-constraints.txt
@@ -28,7 +28,7 @@ imagesize==0.7.1
iso8601==0.1.11
Jinja2==2.10
keystoneauth1==3.4.0
-kombu==4.0.0
+kombu==4.6.1
linecache2==1.0.0
MarkupSafe==1.0
mccabe==0.2.1
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 1726fec..124e7ef 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -756,6 +756,10 @@ class Connection(object):
# NOTE(sileht): we must reraise this without
# trigger error_callback
raise
+ except exceptions.MessageUndeliverable:
+ # NOTE(gsantomaggio): we must reraise this without
+ # trigger error_callback
+ raise
except Exception as exc:
error_callback and error_callback(exc)
self._set_current_channel(None)
@@ -769,6 +773,11 @@ class Connection(object):
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
+ @staticmethod
+ def on_return(exception, exchange, routing_key, message):
+ raise exceptions.MessageUndeliverable(exception, exchange, routing_key,
+ message)
+
def _set_current_channel(self, new_channel):
"""Change the channel to use.
@@ -787,7 +796,8 @@ class Connection(object):
if new_channel is not None:
if self.purpose == rpc_common.PURPOSE_LISTEN:
self._set_qos(new_channel)
- self._producer = kombu.messaging.Producer(new_channel)
+ self._producer = kombu.messaging.Producer(new_channel,
+ on_return=self.on_return)
for consumer in self._consumers:
consumer.declare(self)
diff --git a/oslo_messaging/exceptions.py b/oslo_messaging/exceptions.py
index cfe6a7e..f6ba20a 100644
--- a/oslo_messaging/exceptions.py
+++ b/oslo_messaging/exceptions.py
@@ -16,7 +16,7 @@
import six
__all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure',
- 'InvalidTarget']
+ 'InvalidTarget', 'MessageUndeliverable']
class MessagingException(Exception):
@@ -38,3 +38,14 @@ class InvalidTarget(MessagingException, ValueError):
msg = msg + ":" + six.text_type(target)
super(InvalidTarget, self).__init__(msg)
self.target = target
+
+
+class MessageUndeliverable(Exception):
+ """Raised if message is not routed with mandatory flag"""
+
+ def __init__(self, exception, exchange, routing_key, message):
+ super(MessageUndeliverable, self).__init__()
+ self.exception = exception
+ self.exchange = exchange
+ self.routing_key = routing_key
+ self.message = message
diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py
index 4fa8b48..6800f59 100644
--- a/oslo_messaging/tests/functional/test_functional.py
+++ b/oslo_messaging/tests/functional/test_functional.py
@@ -152,6 +152,40 @@ class CallTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(10, server.endpoint.ival)
+ def test_mandatory_call(self):
+ if not self.url.startswith("rabbit://"):
+ self.skipTest("backend does not support call monitoring")
+
+ transport = self.useFixture(utils.RPCTransportFixture(self.conf,
+ self.url))
+ target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()),
+ server='server_' + str(uuid.uuid4()))
+
+ # test for mandatory flag using transport-options, see:
+ # https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options
+ # first test with `at_least_once=False` raises a "MessagingTimeout"
+ # error since there is no control if the queue actually exists.
+ # (Default behavior)
+ options = oslo_messaging.TransportOptions(at_least_once=False)
+ client1 = utils.ClientStub(transport.transport, target,
+ cast=False, timeout=1,
+ transport_options=options)
+
+ self.assertRaises(oslo_messaging.MessagingTimeout,
+ client1.delay)
+
+ # second test with `at_least_once=True` raises a "MessageUndeliverable"
+ # caused by mandatory flag.
+ # the MessageUndeliverable error is raised immediately without waiting
+ # any timeout
+ options2 = oslo_messaging.TransportOptions(at_least_once=True)
+ client2 = utils.ClientStub(transport.transport, target,
+ cast=False, timeout=60,
+ transport_options=options2)
+
+ self.assertRaises(oslo_messaging.MessageUndeliverable,
+ client2.delay)
+
def test_monitor_long_call(self):
if not (self.url.startswith("rabbit://") or
self.url.startswith("amqp://")):
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index 4d403a0..700c162 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -226,10 +226,15 @@ class RpcCast(RpcCall):
class ClientStub(object):
- def __init__(self, transport, target, cast=False, name=None, **kwargs):
+ def __init__(self, transport, target, cast=False, name=None,
+ transport_options=None, **kwargs):
self.name = name or "functional-tests"
self.cast = cast
- self.client = oslo_messaging.RPCClient(transport, target, **kwargs)
+ self.client = oslo_messaging.RPCClient(
+ transport=transport,
+ target=target,
+ transport_options=transport_options,
+ **kwargs)
def __getattr__(self, name):
context = {"application": self.name}
diff --git a/requirements.txt b/requirements.txt
index 2e05118..7ed7394 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -26,7 +26,7 @@ PyYAML>=3.12 # MIT
# rabbit driver is the default
# we set the amqp version to ensure heartbeat works
amqp>=2.4.1 # BSD
-kombu!=4.0.2,>=4.0.0 # BSD
+kombu!=4.0.2,>=4.6.1 # BSD
# middleware
oslo.middleware>=3.31.0 # Apache-2.0