summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-05 15:20:03 +0000
committerGerrit Code Review <review@openstack.org>2015-02-05 15:20:03 +0000
commit081a0174520c1c263f7abea2527cc72f6ed6b9b9 (patch)
tree6268fae4e388fda1c529b1bd3295eda2fd296aeb
parent0bf006f24f36824510913846ab148cc93bb2e951 (diff)
parent3c40cee36ca52bac7f872d7a4c64d1533da27082 (diff)
downloadoslo-messaging-081a0174520c1c263f7abea2527cc72f6ed6b9b9.tar.gz
Merge "kombu: fix driver loading with kombu+qpid scheme"
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py28
-rw-r--r--tests/drivers/test_impl_rabbit.py14
2 files changed, 29 insertions, 13 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index b485503..86fed1f 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -36,6 +36,7 @@ from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LI
+from oslo_messaging._i18n import _LW
from oslo_messaging import exceptions
@@ -230,8 +231,10 @@ class ConsumerBase(object):
if not callback:
raise ValueError("No callback defined")
- def _callback(raw_message):
- message = self.channel.message_to_python(raw_message)
+ def _callback(message):
+ m2p = getattr(self.channel, 'message_to_python', None)
+ if m2p:
+ message = m2p(message)
self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
@@ -489,9 +492,13 @@ class Connection(object):
"driver instead.")
self._url = 'memory://%s/' % virtual_host
elif url.hosts:
+ if url.transport.startswith('kombu+'):
+ LOG.warn(_LW('Selecting the kombu transport through the '
+ 'transport url (%s) is a experimental feature '
+ 'and this is not yet supported.') % url.transport)
for host in url.hosts:
transport = url.transport.replace('kombu+', '')
- transport = url.transport.replace('rabbit', 'amqp')
+ transport = transport.replace('rabbit', 'amqp')
self._url += '%s%s://%s:%s@%s:%s/%s' % (
";" if self._url else '',
transport,
@@ -526,14 +533,12 @@ class Connection(object):
failover_strategy="shuffle")
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'),
- {'hostname': self.connection.hostname,
- 'port': self.connection.port})
+ self.connection.info())
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
method=lambda: True)
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
- {'hostname': self.connection.hostname,
- 'port': self.connection.port})
+ self.connection.info())
# NOTE(sileht):
# value choosen according the best practice from kombu:
@@ -624,16 +629,15 @@ class Connection(object):
if self.driver_conf.kombu_reconnect_delay > 0
else interval)
- info = {'hostname': self.connection.hostname,
- 'port': self.connection.port,
- 'err_str': exc, 'sleep_time': interval}
+ info = {'err_str': exc, 'sleep_time': interval}
+ info.update(self.connection.info())
if 'Socket closed' in six.text_type(exc):
- LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
+ LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
- LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 1962ea6..8e9b29e 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -130,6 +130,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
expected=["amqp://user:password@host:10/virtual_host",
"amqp://user2:password2@host2:12/virtual_host"]
)),
+ ('qpid',
+ dict(url='kombu+qpid://user:password@host:10/virtual_host',
+ expected=['qpid://user:password@host:10/virtual_host'])),
+ ('rabbit',
+ dict(url='kombu+rabbit://user:password@host:10/virtual_host',
+ expected=['amqp://user:password@host:10/virtual_host'])),
]
def setUp(self):
@@ -143,7 +149,13 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
self.addCleanup(transport.cleanup)
driver = transport._driver
- urls = driver._get_connection()._url.split(";")
+ # NOTE(sileht): some kombu transport can depend on library that
+ # we don't want to depend yet, because selecting the transport
+ # is experimental, only amqp is supported
+ # for example kombu+qpid depends of qpid-tools
+ # so, mock the connection.info to skip call to qpid-tools
+ with mock.patch('kombu.connection.Connection.info'):
+ urls = driver._get_connection()._url.split(";")
self.assertEqual(sorted(self.expected), sorted(urls))