diff options
author | Gabriele <gsantomaggio@suse.com> | 2019-06-19 09:51:55 +0200 |
---|---|---|
committer | Gabriele <gsantomaggio@suse.com> | 2019-06-24 16:50:35 +0200 |
commit | e804874c50fd8cfbfca2d982a4932fdd8844c3f1 (patch) | |
tree | 55029d93361eb360376a106b2c8f0c21bdb4edf0 | |
parent | 03ec779cdfea76e5d23f997d1679e01ddaad9309 (diff) | |
download | oslo-messaging-e804874c50fd8cfbfca2d982a4932fdd8844c3f1.tar.gz |
Implement the transport options
With this feature, it is possible to specialize the parameters to send.
`options = oslo_messaging.TransportOptions(at_least_once=True)`
TransportOptions is used in every single driver,
for example in RabbitMQ driver is used to handle the mandatory flag.
Notes:
- The idea of creating a new class TransportOptions is because I'd like
to have an abstract class not related only to the RPCClient
- at_least_once is the first parameter, when needed we can add the
others.
Implements: blueprint transport-options (second point)
The blueprint link is [1]
To test it you can use [2]
1- https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options
2- https://github.com/Gsantomaggio/rabbitmq-utils/
tree/master/openstack/mandatory_test
Change-Id: I1858e4a990507d3c2bac2ef7fbef75d8c2dbfce2
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 15 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 2 | ||||
-rwxr-xr-x | oslo_messaging/tests/rpc/test_client.py | 21 | ||||
-rw-r--r-- | oslo_messaging/transport.py | 11 |
4 files changed, 35 insertions, 14 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index aedb163..0edf45a 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -49,7 +49,6 @@ from oslo_messaging import exceptions # NOTE(sileht): don't exists in py2 socket module TCP_USER_TIMEOUT = 18 - rabbit_opts = [ cfg.BoolOpt('ssl', default=False, @@ -1150,15 +1149,17 @@ class Connection(object): 'transport_options': str(transport_options)} LOG.trace('Connection._publish: sending message %(msg)s to' ' %(who)s with routing key %(key)s', log_info) - # NOTE(sileht): no need to wait more, caller expects # a answer before timeout is reached with self._transport_socket_timeout(timeout): - self._producer.publish(msg, - exchange=exchange, - routing_key=routing_key, - expiration=timeout, - compression=self.kombu_compression) + self._producer.publish( + msg, + mandatory=transport_options.at_least_once if + transport_options else False, + exchange=exchange, + routing_key=routing_key, + expiration=timeout, + compression=self.kombu_compression) def _publish_and_creates_default_queue(self, exchange, msg, routing_key=None, timeout=None, diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index a18da15..5f302db 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -199,6 +199,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase): 'msg', expiration=1, exchange=exchange_mock, compression=self.conf.oslo_messaging_rabbit.kombu_compression, + mandatory=False, routing_key='routing_key') @mock.patch('kombu.messaging.Producer.publish') @@ -212,6 +213,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase): conn._publish(exchange_mock, 'msg', routing_key='routing_key') fake_publish.assert_called_with( 'msg', expiration=None, + mandatory=False, compression=self.conf.oslo_messaging_rabbit.kombu_compression, exchange=exchange_mock, routing_key='routing_key') diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py index ec22d70..6c54625 100755 --- a/oslo_messaging/tests/rpc/test_client.py +++ b/oslo_messaging/tests/rpc/test_client.py @@ -42,14 +42,15 @@ class TestCastCall(test_utils.BaseTestCase): def test_cast_call(self): self.config(rpc_response_timeout=None) - + transport_options = oslo_messaging.TransportOptions() transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target()) + client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), + transport_options=transport_options) transport._send = mock.Mock() msg = dict(method='foo', args=self.args) - kwargs = {'retry': None, 'transport_options': None} + kwargs = {'retry': None, 'transport_options': transport_options} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None @@ -57,7 +58,7 @@ class TestCastCall(test_utils.BaseTestCase): method = client.call if self.call else client.cast method(self.ctxt, 'foo', **self.args) - + self.assertFalse(transport_options.at_least_once) transport._send.assert_called_once_with(oslo_messaging.Target(), self.ctxt, msg, @@ -67,13 +68,18 @@ class TestCastCall(test_utils.BaseTestCase): self.config(rpc_response_timeout=None) transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:') - client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(), - transport_options={'my_k': 'my_val'}) + + transport_options = oslo_messaging.TransportOptions(at_least_once=True) + client = oslo_messaging.RPCClient( + transport, + oslo_messaging.Target(), + transport_options=transport_options) transport._send = mock.Mock() msg = dict(method='foo', args=self.args) - kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}} + kwargs = {'retry': None, + 'transport_options': transport_options} if self.call: kwargs['wait_for_reply'] = True kwargs['timeout'] = None @@ -82,6 +88,7 @@ class TestCastCall(test_utils.BaseTestCase): method = client.call if self.call else client.cast method(self.ctxt, 'foo', **self.args) + self.assertTrue(transport_options.at_least_once) transport._send.assert_called_once_with(oslo_messaging.Target(), self.ctxt, msg, diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index c263e35..48979a6 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -33,6 +33,7 @@ __all__ = [ 'Transport', 'TransportHost', 'TransportURL', + 'TransportOptions', 'get_transport', 'set_transport_defaults', ] @@ -277,6 +278,16 @@ class TransportHost(object): return '<TransportHost ' + values + '>' +class TransportOptions(object): + + def __init__(self, at_least_once=False): + self._at_least_once = at_least_once + + @property + def at_least_once(self): + return self._at_least_once + + class TransportURL(object): """A parsed transport URL. |