summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-03-25 00:28:19 +0000
committerGerrit Code Review <review@openstack.org>2015-03-25 00:28:19 +0000
commitc21f8d99aa489e25cc3df379dec7a74f016c20c7 (patch)
tree70ffd0780b4c1b17bb30a1aa7f3ef9090dc0a47e
parent4a9bfe72fd98407c7dfff33b3868e1fc9854b18f (diff)
parentee18dc5eed7b9dfa4739fed760bd4c4a0478ef51 (diff)
downloadoslo-messaging-c21f8d99aa489e25cc3df379dec7a74f016c20c7.tar.gz
Merge "rabbit: Improves logging" into stable/kilo
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py76
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py6
-rw-r--r--tests/drivers/test_impl_rabbit.py6
3 files changed, 59 insertions, 29 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 1d93f2f..15273a6 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -201,12 +201,12 @@ class ConsumerBase(object):
# Simply retrying will solve the error most of the time and
# should work well enough as a workaround until the race condition
# itself can be fixed.
- # TODO(jrosenboom): In order to be able to match the Execption
+ # TODO(jrosenboom): In order to be able to match the Exception
# more specifically, we have to refactor ConsumerBase to use
# 'channel_errors' of the kombu connection object that
# has created the channel.
# See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
- LOG.exception(_("Declaring queue failed with (%s), retrying"), e)
+ LOG.error(_("Declaring queue failed with (%s), retrying"), e)
self.queue.declare()
def _callback_handler(self, message, callback):
@@ -750,10 +750,10 @@ class Connection(object):
return False
def ensure_connection(self):
- self.ensure(error_callback=None,
- method=lambda: True)
+ self.ensure(method=lambda: True)
- def ensure(self, error_callback, method, retry=None,
+ def ensure(self, method, retry=None,
+ recoverable_error_callback=None, error_callback=None,
timeout_is_error=True):
"""Will retry up to retry number of times.
retry = None means use the value of rabbit_max_retries
@@ -778,7 +778,10 @@ class Connection(object):
retry = None
def on_error(exc, interval):
- error_callback and error_callback(exc)
+ LOG.debug(_("Received recoverable error from kombu:"),
+ exc_info=True)
+
+ recoverable_error_callback and recoverable_error_callback(exc)
interval = (self.driver_conf.kombu_reconnect_delay + interval
if self.driver_conf.kombu_reconnect_delay > 0
@@ -788,13 +791,13 @@ class Connection(object):
info.update(self.connection.info())
if 'Socket closed' in six.text_type(exc):
- LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
- ' the connection. Check login credentials:'
- ' %(err_str)s'), info)
+ LOG.error(_LE('AMQP server %(hostname)s:%(port)d closed'
+ ' the connection. Check login credentials:'
+ ' %(err_str)s'), info)
else:
- LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
- 'unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.'), info)
+ LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
+ 'unreachable: %(err_str)s. Trying again in '
+ '%(sleep_time)d seconds.'), info)
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
@@ -843,6 +846,9 @@ class Connection(object):
self._set_current_channel(channel)
return ret
except recoverable_errors as exc:
+ LOG.debug(_("Received recoverable error from kombu:"),
+ exc_info=True)
+ error_callback and error_callback(exc)
self._set_current_channel(None)
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
@@ -855,6 +861,9 @@ class Connection(object):
'retry': retry}
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
+ except Exception as exc:
+ error_callback and error_callback(exc)
+ raise
def _set_current_channel(self, new_channel):
"""Change the channel to use.
@@ -960,7 +969,8 @@ class Connection(object):
return consumer
with self._connection_lock:
- return self.ensure(_connect_error, _declare_consumer)
+ return self.ensure(_declare_consumer,
+ error_callback=_connect_error)
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers.
@@ -975,9 +985,12 @@ class Connection(object):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
- def _error_callback(exc):
+ def _recoverable_error_callback(exc):
self.do_consume = True
timer.check_return(_raise_timeout, exc)
+
+ def _error_callback(exc):
+ _recoverable_error_callback(exc)
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
@@ -1009,16 +1022,28 @@ class Connection(object):
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
- yield self.ensure(_error_callback, _consume)
+ yield self.ensure(
+ _consume,
+ recoverable_error_callback=_recoverable_error_callback,
+ error_callback=_error_callback)
+
+ @staticmethod
+ def _log_publisher_send_error(topic, exc):
+ log_info = {'topic': topic, 'err_str': exc}
+ LOG.exception(_("Failed to publish message to topic "
+ "'%(topic)s': %(err_str)s"), log_info)
+
+ default_marker = object()
def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
- **kwargs):
+ error_callback=default_marker, **kwargs):
"""Send to a publisher based on the publisher class."""
- def _error_callback(exc):
- log_info = {'topic': topic, 'err_str': exc}
- LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s"), log_info)
+ def _default_error_callback(exc):
+ self._log_publisher_send_error(topic, exc)
+
+ if error_callback is self.default_marker:
+ error_callback = _default_error_callback
def _publish():
publisher = cls(self.driver_conf, self.channel, topic=topic,
@@ -1026,7 +1051,7 @@ class Connection(object):
publisher.send(msg, timeout)
with self._connection_lock:
- self.ensure(_error_callback, _publish, retry=retry)
+ self.ensure(_publish, retry=retry, error_callback=error_callback)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@@ -1058,7 +1083,9 @@ class Connection(object):
while True:
try:
- self.publisher_send(DirectPublisher, msg_id, msg)
+ self.publisher_send(DirectPublisher, msg_id, msg,
+ error_callback=None)
+ return
except self.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
@@ -1073,8 +1100,11 @@ class Connection(object):
"exist yet, retrying...") % msg_id)
time.sleep(1)
continue
+ self._log_publisher_send_error(msg_id, exc)
+ raise
+ except Exception as exc:
+ self._log_publisher_send_error(msg_id, exc)
raise
- return
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 5b68ea9..8830125 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -781,7 +781,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
+ self.connection.ensure, mock_callback,
retry=4)
self.assertEqual(5, self.kombu_connect.call_count)
self.assertEqual(6, mock_callback.call_count)
@@ -789,7 +789,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
+ self.connection.ensure, mock_callback,
retry=1)
self.assertEqual(2, self.kombu_connect.call_count)
self.assertEqual(3, mock_callback.call_count)
@@ -797,7 +797,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
+ self.connection.ensure, mock_callback,
retry=0)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 783afd8..d383bab 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -726,7 +726,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
+ self.connection.ensure, mock_callback,
retry=4)
self.assertEqual(5, self.kombu_connect.call_count)
self.assertEqual(6, mock_callback.call_count)
@@ -734,7 +734,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
+ self.connection.ensure, mock_callback,
retry=1)
self.assertEqual(2, self.kombu_connect.call_count)
self.assertEqual(3, mock_callback.call_count)
@@ -742,7 +742,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
- self.connection.ensure, None, mock_callback,
+ self.connection.ensure, mock_callback,
retry=0)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)