diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-02-05 15:20:03 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-02-05 15:20:03 +0000 |
commit | 081a0174520c1c263f7abea2527cc72f6ed6b9b9 (patch) | |
tree | 6268fae4e388fda1c529b1bd3295eda2fd296aeb | |
parent | 0bf006f24f36824510913846ab148cc93bb2e951 (diff) | |
parent | 3c40cee36ca52bac7f872d7a4c64d1533da27082 (diff) | |
download | oslo-messaging-081a0174520c1c263f7abea2527cc72f6ed6b9b9.tar.gz |
Merge "kombu: fix driver loading with kombu+qpid scheme"
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 28 | ||||
-rw-r--r-- | tests/drivers/test_impl_rabbit.py | 14 |
2 files changed, 29 insertions, 13 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b485503..86fed1f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -36,6 +36,7 @@ from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ from oslo_messaging._i18n import _LI +from oslo_messaging._i18n import _LW from oslo_messaging import exceptions @@ -230,8 +231,10 @@ class ConsumerBase(object): if not callback: raise ValueError("No callback defined") - def _callback(raw_message): - message = self.channel.message_to_python(raw_message) + def _callback(message): + m2p = getattr(self.channel, 'message_to_python', None) + if m2p: + message = m2p(message) self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -489,9 +492,13 @@ class Connection(object): "driver instead.") self._url = 'memory://%s/' % virtual_host elif url.hosts: + if url.transport.startswith('kombu+'): + LOG.warn(_LW('Selecting the kombu transport through the ' + 'transport url (%s) is a experimental feature ' + 'and this is not yet supported.') % url.transport) for host in url.hosts: transport = url.transport.replace('kombu+', '') - transport = url.transport.replace('rabbit', 'amqp') + transport = transport.replace('rabbit', 'amqp') self._url += '%s%s://%s:%s@%s:%s/%s' % ( ";" if self._url else '', transport, @@ -526,14 +533,12 @@ class Connection(object): failover_strategy="shuffle") LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'), - {'hostname': self.connection.hostname, - 'port': self.connection.port}) + self.connection.info()) # NOTE(sileht): just ensure the connection is setuped at startup self.ensure(error_callback=None, method=lambda: True) LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), - {'hostname': self.connection.hostname, - 'port': self.connection.port}) + self.connection.info()) # NOTE(sileht): # value choosen according the best practice from kombu: @@ -624,16 +629,15 @@ class Connection(object): if self.driver_conf.kombu_reconnect_delay > 0 else interval) - info = {'hostname': self.connection.hostname, - 'port': self.connection.port, - 'err_str': exc, 'sleep_time': interval} + info = {'err_str': exc, 'sleep_time': interval} + info.update(self.connection.info()) if 'Socket closed' in six.text_type(exc): - LOG.error(_('AMQP server %(hostname)s:%(port)s closed' + LOG.error(_('AMQP server %(hostname)s:%(port)d closed' ' the connection. Check login credentials:' ' %(err_str)s'), info) else: - LOG.error(_('AMQP server on %(hostname)s:%(port)s is ' + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' 'unreachable: %(err_str)s. Trying again in ' '%(sleep_time)d seconds.'), info) diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index 1962ea6..8e9b29e 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -130,6 +130,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): expected=["amqp://user:password@host:10/virtual_host", "amqp://user2:password2@host2:12/virtual_host"] )), + ('qpid', + dict(url='kombu+qpid://user:password@host:10/virtual_host', + expected=['qpid://user:password@host:10/virtual_host'])), + ('rabbit', + dict(url='kombu+rabbit://user:password@host:10/virtual_host', + expected=['amqp://user:password@host:10/virtual_host'])), ] def setUp(self): @@ -143,7 +149,13 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): self.addCleanup(transport.cleanup) driver = transport._driver - urls = driver._get_connection()._url.split(";") + # NOTE(sileht): some kombu transport can depend on library that + # we don't want to depend yet, because selecting the transport + # is experimental, only amqp is supported + # for example kombu+qpid depends of qpid-tools + # so, mock the connection.info to skip call to qpid-tools + with mock.patch('kombu.connection.Connection.info'): + urls = driver._get_connection()._url.split(";") self.assertEqual(sorted(self.expected), sorted(urls)) |