summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <mehdi.abaakouk@enovance.com>2014-12-08 16:48:36 +0100
committerMehdi Abaakouk <mehdi.abaakouk@enovance.com>2015-01-21 17:15:05 +0100
commit3c40cee36ca52bac7f872d7a4c64d1533da27082 (patch)
tree27f49fcbd112f608d7009b9a2ff82162f5761c29
parent2a35b290b1dc175b8813e103f1cb2db5df0f15bb (diff)
downloadoslo-messaging-3c40cee36ca52bac7f872d7a4c64d1533da27082.tar.gz
kombu: fix driver loading with kombu+qpid scheme
When a url looks like: kombu+qpid:///host:port/ the driver fail to load. This change fixes that, adds a warning message that this kind of URL is experimental and not yet supported. Also our Consumer code use internal kombu API that can be optionnal implemented by the kombu transport (message_to_python), so check that this one exists before using it. In the future, we should use kombu.messaging.Consumer/Publisher instead of Consumer/Publisher implementation to avoid such hack... Change-Id: I066d57c23bff922c5734ab036b6ca8e1608e5c6a
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py28
-rw-r--r--tests/drivers/test_impl_rabbit.py14
2 files changed, 29 insertions, 13 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index f7b932e..0347f3d 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -36,6 +36,7 @@ from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LI
+from oslo_messaging._i18n import _LW
from oslo_messaging import exceptions
@@ -213,8 +214,10 @@ class ConsumerBase(object):
if not callback:
raise ValueError("No callback defined")
- def _callback(raw_message):
- message = self.channel.message_to_python(raw_message)
+ def _callback(message):
+ m2p = getattr(self.channel, 'message_to_python', None)
+ if m2p:
+ message = m2p(message)
self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
@@ -469,9 +472,13 @@ class Connection(object):
"driver instead.")
self._url = 'memory://%s/' % virtual_host
elif url.hosts:
+ if url.transport.startswith('kombu+'):
+ LOG.warn(_LW('Selecting the kombu transport through the '
+ 'transport url (%s) is a experimental feature '
+ 'and this is not yet supported.') % url.transport)
for host in url.hosts:
transport = url.transport.replace('kombu+', '')
- transport = url.transport.replace('rabbit', 'amqp')
+ transport = transport.replace('rabbit', 'amqp')
self._url += '%s%s://%s:%s@%s:%s/%s' % (
";" if self._url else '',
transport,
@@ -506,14 +513,12 @@ class Connection(object):
failover_strategy="shuffle")
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'),
- {'hostname': self.connection.hostname,
- 'port': self.connection.port})
+ self.connection.info())
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
method=lambda channel: True)
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
- {'hostname': self.connection.hostname,
- 'port': self.connection.port})
+ self.connection.info())
if self._url.startswith('memory://'):
# Kludge to speed up tests.
@@ -599,16 +604,15 @@ class Connection(object):
interval = (self.conf.kombu_reconnect_delay + interval
if self.conf.kombu_reconnect_delay > 0 else interval)
- info = {'hostname': self.connection.hostname,
- 'port': self.connection.port,
- 'err_str': exc, 'sleep_time': interval}
+ info = {'err_str': exc, 'sleep_time': interval}
+ info.update(self.connection.info())
if 'Socket closed' in six.text_type(exc):
- LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
+ LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
- LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 1fe0057..3dfbf13 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -130,6 +130,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
expected=["amqp://user:password@host:10/virtual_host",
"amqp://user2:password2@host2:12/virtual_host"]
)),
+ ('qpid',
+ dict(url='kombu+qpid://user:password@host:10/virtual_host',
+ expected=['qpid://user:password@host:10/virtual_host'])),
+ ('rabbit',
+ dict(url='kombu+rabbit://user:password@host:10/virtual_host',
+ expected=['amqp://user:password@host:10/virtual_host'])),
]
def setUp(self):
@@ -143,7 +149,13 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
self.addCleanup(transport.cleanup)
driver = transport._driver
- urls = driver._get_connection()._url.split(";")
+ # NOTE(sileht): some kombu transport can depend on library that
+ # we don't want to depend yet, because selecting the transport
+ # is experimental, only amqp is supported
+ # for example kombu+qpid depends of qpid-tools
+ # so, mock the connection.info to skip call to qpid-tools
+ with mock.patch('kombu.connection.Connection.info'):
+ urls = driver._get_connection()._url.split(";")
self.assertEqual(sorted(self.expected), sorted(urls))