diff options
Diffstat (limited to 'oslo_messaging/_drivers/impl_rabbit.py')
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b485503..86fed1f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -36,6 +36,7 @@ from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ from oslo_messaging._i18n import _LI +from oslo_messaging._i18n import _LW from oslo_messaging import exceptions @@ -230,8 +231,10 @@ class ConsumerBase(object): if not callback: raise ValueError("No callback defined") - def _callback(raw_message): - message = self.channel.message_to_python(raw_message) + def _callback(message): + m2p = getattr(self.channel, 'message_to_python', None) + if m2p: + message = m2p(message) self._callback_handler(message, callback) self.queue.consume(*args, callback=_callback, **options) @@ -489,9 +492,13 @@ class Connection(object): "driver instead.") self._url = 'memory://%s/' % virtual_host elif url.hosts: + if url.transport.startswith('kombu+'): + LOG.warn(_LW('Selecting the kombu transport through the ' + 'transport url (%s) is a experimental feature ' + 'and this is not yet supported.') % url.transport) for host in url.hosts: transport = url.transport.replace('kombu+', '') - transport = url.transport.replace('rabbit', 'amqp') + transport = transport.replace('rabbit', 'amqp') self._url += '%s%s://%s:%s@%s:%s/%s' % ( ";" if self._url else '', transport, @@ -526,14 +533,12 @@ class Connection(object): failover_strategy="shuffle") LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'), - {'hostname': self.connection.hostname, - 'port': self.connection.port}) + self.connection.info()) # NOTE(sileht): just ensure the connection is setuped at startup self.ensure(error_callback=None, method=lambda: True) LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), - {'hostname': self.connection.hostname, - 'port': self.connection.port}) + self.connection.info()) # NOTE(sileht): # value choosen according the best practice from kombu: @@ -624,16 +629,15 @@ class Connection(object): if self.driver_conf.kombu_reconnect_delay > 0 else interval) - info = {'hostname': self.connection.hostname, - 'port': self.connection.port, - 'err_str': exc, 'sleep_time': interval} + info = {'err_str': exc, 'sleep_time': interval} + info.update(self.connection.info()) if 'Socket closed' in six.text_type(exc): - LOG.error(_('AMQP server %(hostname)s:%(port)s closed' + LOG.error(_('AMQP server %(hostname)s:%(port)d closed' ' the connection. Check login credentials:' ' %(err_str)s'), info) else: - LOG.error(_('AMQP server on %(hostname)s:%(port)s is ' + LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' 'unreachable: %(err_str)s. Trying again in ' '%(sleep_time)d seconds.'), info) |