summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/impl_rabbit.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py28
1 files changed, 16 insertions, 12 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index b485503..86fed1f 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -36,6 +36,7 @@ from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LI
+from oslo_messaging._i18n import _LW
from oslo_messaging import exceptions
@@ -230,8 +231,10 @@ class ConsumerBase(object):
if not callback:
raise ValueError("No callback defined")
- def _callback(raw_message):
- message = self.channel.message_to_python(raw_message)
+ def _callback(message):
+ m2p = getattr(self.channel, 'message_to_python', None)
+ if m2p:
+ message = m2p(message)
self._callback_handler(message, callback)
self.queue.consume(*args, callback=_callback, **options)
@@ -489,9 +492,13 @@ class Connection(object):
"driver instead.")
self._url = 'memory://%s/' % virtual_host
elif url.hosts:
+ if url.transport.startswith('kombu+'):
+ LOG.warn(_LW('Selecting the kombu transport through the '
+ 'transport url (%s) is a experimental feature '
+ 'and this is not yet supported.') % url.transport)
for host in url.hosts:
transport = url.transport.replace('kombu+', '')
- transport = url.transport.replace('rabbit', 'amqp')
+ transport = transport.replace('rabbit', 'amqp')
self._url += '%s%s://%s:%s@%s:%s/%s' % (
";" if self._url else '',
transport,
@@ -526,14 +533,12 @@ class Connection(object):
failover_strategy="shuffle")
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'),
- {'hostname': self.connection.hostname,
- 'port': self.connection.port})
+ self.connection.info())
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
method=lambda: True)
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
- {'hostname': self.connection.hostname,
- 'port': self.connection.port})
+ self.connection.info())
# NOTE(sileht):
# value choosen according the best practice from kombu:
@@ -624,16 +629,15 @@ class Connection(object):
if self.driver_conf.kombu_reconnect_delay > 0
else interval)
- info = {'hostname': self.connection.hostname,
- 'port': self.connection.port,
- 'err_str': exc, 'sleep_time': interval}
+ info = {'err_str': exc, 'sleep_time': interval}
+ info.update(self.connection.info())
if 'Socket closed' in six.text_type(exc):
- LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
+ LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
- LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
+ LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)