diff options
author | Mehdi Abaakouk <mehdi.abaakouk@enovance.com> | 2014-12-08 16:48:36 +0100 |
---|---|---|
committer | Mehdi Abaakouk <mehdi.abaakouk@enovance.com> | 2015-01-21 17:15:05 +0100 |
commit | 3c40cee36ca52bac7f872d7a4c64d1533da27082 (patch) | |
tree | 27f49fcbd112f608d7009b9a2ff82162f5761c29 | |
parent | 2a35b290b1dc175b8813e103f1cb2db5df0f15bb (diff) | |
download | oslo-messaging-3c40cee36ca52bac7f872d7a4c64d1533da27082.tar.gz |
kombu: fix driver loading with kombu+qpid scheme
When a url looks like: kombu+qpid:///host:port/ the driver fail
to load.
This change fixes that, adds a warning message that this kind of
URL is experimental and not yet supported.
Also our Consumer code use internal kombu API that can be optionnal
implemented by the kombu transport (message_to_python),
so check that this one exists before using it.
In the future, we should use kombu.messaging.Consumer/Publisher
instead of Consumer/Publisher implementation to avoid such hack...
Change-Id: I066d57c23bff922c5734ab036b6ca8e1608e5c6a
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 28 | ||||
-rw-r--r-- | tests/drivers/test_impl_rabbit.py | 14 |
2 files changed, 29 insertions, 13 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index f7b932e..0347f3d 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -36,6 +36,7 @@ from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ from oslo_messaging._i18n import _LI +from oslo_messaging._i18n import _LW from oslo_messaging import exceptions @@ -213,8 +214,10 @@ class ConsumerBase(object): if not callback: raise ValueError("No callback defined") - def _callback(raw_message): - message = self.channel.message_to_python(raw_message) + def _callback(message): + m2p = getattr(self.channel, 'message_to_python', None) + if m2p: + message = m2p(message) self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -469,9 +472,13 @@ class Connection(object): "driver instead.") self._url = 'memory://%s/' % virtual_host elif url.hosts: + if url.transport.startswith('kombu+'): + LOG.warn(_LW('Selecting the kombu transport through the ' + 'transport url (%s) is a experimental feature ' + 'and this is not yet supported.') % url.transport) for host in url.hosts: transport = url.transport.replace('kombu+', '') - transport = url.transport.replace('rabbit', 'amqp') + transport = transport.replace('rabbit', 'amqp') self._url += '%s%s://%s:%s@%s:%s/%s' % ( ";" if self._url else '', transport, @@ -506,14 +513,12 @@ class Connection(object): failover_strategy="shuffle") LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'), - {'hostname': self.connection.hostname, - 'port': self.connection.port}) + self.connection.info()) # NOTE(sileht): just ensure the connection is setuped at startup self.ensure(error_callback=None, method=lambda channel: True) LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), - {'hostname': self.connection.hostname, - 'port': self.connection.port}) + self.connection.info()) if self._url.startswith('memory://'): # Kludge to speed up tests. @@ -599,16 +604,15 @@ class Connection(object): interval = (self.conf.kombu_reconnect_delay + interval if self.conf.kombu_reconnect_delay > 0 else interval) - info = {'hostname': self.connection.hostname, - 'port': self.connection.port, - 'err_str': exc, 'sleep_time': interval} + info = {'err_str': exc, 'sleep_time': interval} + info.update(self.connection.info()) if 'Socket closed' in six.text_type(exc): - LOG.error(_('AMQP server %(hostname)s:%(port)s closed' + LOG.error(_('AMQP server %(hostname)s:%(port)d closed' ' the connection. Check login credentials:' ' %(err_str)s'), info) else: - LOG.error(_('AMQP server on %(hostname)s:%(port)s is ' + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' 'unreachable: %(err_str)s. Trying again in ' '%(sleep_time)d seconds.'), info) diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index 1fe0057..3dfbf13 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -130,6 +130,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): expected=["amqp://user:password@host:10/virtual_host", "amqp://user2:password2@host2:12/virtual_host"] )), + ('qpid', + dict(url='kombu+qpid://user:password@host:10/virtual_host', + expected=['qpid://user:password@host:10/virtual_host'])), + ('rabbit', + dict(url='kombu+rabbit://user:password@host:10/virtual_host', + expected=['amqp://user:password@host:10/virtual_host'])), ] def setUp(self): @@ -143,7 +149,13 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): self.addCleanup(transport.cleanup) driver = transport._driver - urls = driver._get_connection()._url.split(";") + # NOTE(sileht): some kombu transport can depend on library that + # we don't want to depend yet, because selecting the transport + # is experimental, only amqp is supported + # for example kombu+qpid depends of qpid-tools + # so, mock the connection.info to skip call to qpid-tools + with mock.patch('kombu.connection.Connection.info'): + urls = driver._get_connection()._url.split(";") self.assertEqual(sorted(self.expected), sorted(urls)) |