summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriele <gsantomaggio@suse.com>2019-06-19 09:51:55 +0200
committerGabriele <gsantomaggio@suse.com>2019-06-24 16:50:35 +0200
commite804874c50fd8cfbfca2d982a4932fdd8844c3f1 (patch)
tree55029d93361eb360376a106b2c8f0c21bdb4edf0
parent03ec779cdfea76e5d23f997d1679e01ddaad9309 (diff)
downloadoslo-messaging-e804874c50fd8cfbfca2d982a4932fdd8844c3f1.tar.gz
Implement the transport options
With this feature, it is possible to specialize the parameters to send. `options = oslo_messaging.TransportOptions(at_least_once=True)` TransportOptions is used in every single driver, for example in RabbitMQ driver is used to handle the mandatory flag. Notes: - The idea of creating a new class TransportOptions is because I'd like to have an abstract class not related only to the RPCClient - at_least_once is the first parameter, when needed we can add the others. Implements: blueprint transport-options (second point) The blueprint link is [1] To test it you can use [2] 1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options 2- https://github.com/Gsantomaggio/rabbitmq-utils/ tree/master/openstack/mandatory_test Change-Id: I1858e4a990507d3c2bac2ef7fbef75d8c2dbfce2
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py15
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py2
-rwxr-xr-xoslo_messaging/tests/rpc/test_client.py21
-rw-r--r--oslo_messaging/transport.py11
4 files changed, 35 insertions, 14 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index aedb163..0edf45a 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -49,7 +49,6 @@ from oslo_messaging import exceptions
# NOTE(sileht): don't exists in py2 socket module
TCP_USER_TIMEOUT = 18
-
rabbit_opts = [
cfg.BoolOpt('ssl',
default=False,
@@ -1150,15 +1149,17 @@ class Connection(object):
'transport_options': str(transport_options)}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
-
# NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached
with self._transport_socket_timeout(timeout):
- self._producer.publish(msg,
- exchange=exchange,
- routing_key=routing_key,
- expiration=timeout,
- compression=self.kombu_compression)
+ self._producer.publish(
+ msg,
+ mandatory=transport_options.at_least_once if
+ transport_options else False,
+ exchange=exchange,
+ routing_key=routing_key,
+ expiration=timeout,
+ compression=self.kombu_compression)
def _publish_and_creates_default_queue(self, exchange, msg,
routing_key=None, timeout=None,
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index a18da15..5f302db 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -199,6 +199,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
'msg', expiration=1,
exchange=exchange_mock,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
+ mandatory=False,
routing_key='routing_key')
@mock.patch('kombu.messaging.Producer.publish')
@@ -212,6 +213,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
conn._publish(exchange_mock, 'msg', routing_key='routing_key')
fake_publish.assert_called_with(
'msg', expiration=None,
+ mandatory=False,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
exchange=exchange_mock,
routing_key='routing_key')
diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py
index ec22d70..6c54625 100755
--- a/oslo_messaging/tests/rpc/test_client.py
+++ b/oslo_messaging/tests/rpc/test_client.py
@@ -42,14 +42,15 @@ class TestCastCall(test_utils.BaseTestCase):
def test_cast_call(self):
self.config(rpc_response_timeout=None)
-
+ transport_options = oslo_messaging.TransportOptions()
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
+ client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
+ transport_options=transport_options)
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
- kwargs = {'retry': None, 'transport_options': None}
+ kwargs = {'retry': None, 'transport_options': transport_options}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@@ -57,7 +58,7 @@ class TestCastCall(test_utils.BaseTestCase):
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
-
+ self.assertFalse(transport_options.at_least_once)
transport._send.assert_called_once_with(oslo_messaging.Target(),
self.ctxt,
msg,
@@ -67,13 +68,18 @@ class TestCastCall(test_utils.BaseTestCase):
self.config(rpc_response_timeout=None)
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
- client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
- transport_options={'my_k': 'my_val'})
+
+ transport_options = oslo_messaging.TransportOptions(at_least_once=True)
+ client = oslo_messaging.RPCClient(
+ transport,
+ oslo_messaging.Target(),
+ transport_options=transport_options)
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
- kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}}
+ kwargs = {'retry': None,
+ 'transport_options': transport_options}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@@ -82,6 +88,7 @@ class TestCastCall(test_utils.BaseTestCase):
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
+ self.assertTrue(transport_options.at_least_once)
transport._send.assert_called_once_with(oslo_messaging.Target(),
self.ctxt,
msg,
diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py
index c263e35..48979a6 100644
--- a/oslo_messaging/transport.py
+++ b/oslo_messaging/transport.py
@@ -33,6 +33,7 @@ __all__ = [
'Transport',
'TransportHost',
'TransportURL',
+ 'TransportOptions',
'get_transport',
'set_transport_defaults',
]
@@ -277,6 +278,16 @@ class TransportHost(object):
return '<TransportHost ' + values + '>'
+class TransportOptions(object):
+
+ def __init__(self, at_least_once=False):
+ self._at_least_once = at_least_once
+
+ @property
+ def at_least_once(self):
+ return self._at_least_once
+
+
class TransportURL(object):
"""A parsed transport URL.