summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHervé Beraud <hberaud@redhat.com>2021-01-08 15:54:54 +0100
committerHervé Beraud <hberaud@redhat.com>2021-02-18 11:46:49 +0100
commit391ce7fc69adc8d713d8ca64e76d901eb3e65df1 (patch)
tree9dfb6b807682a867d13c86d9ac3e45aa278b0ef6
parent06ad070cc6dbe72d1f96c529f7bd193dbae383f5 (diff)
downloadoslo-messaging-stable/victoria.tar.gz
Correctly handle missing RabbitMQ queuesvictoria-em12.5.2stable/victoria
Currently, setting the '[oslo_messaging] direct_mandatory_flag' config option to 'True' (the default) will result in a 'MessageUndeliverable' exception being raised when sending a reply if a RabbitMQ queue is missing [1]. It was the responsibility of the application to handle this exception, however, many applications are not doing so. This has resulted in a number of bug reports. Start handling this error condition, using a retry loop to attempt to resend the message and work around any temporary glitches. Since attempting to send a reply will will no longer raise an exception, there is little benefit in retaining the '[oslo_messaging] direct_mandatory_flag' config option: users setting this to False will simply not benefit from the retry logic and improved logging added here. This option is already deprecated though and will be fully removed in a future release. [1] https://www.rabbitmq.com/channels.html Change-Id: Id5cddbefbe24ef100f1cc522f44430df77d217cb Closes-Bug: #1905965 (cherry picked from commit 4937949dffecdf8863a7876e5a6b0b18e811c3ac)
-rw-r--r--doc/source/admin/rabbit.rst3
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py68
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py3
-rw-r--r--releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml5
4 files changed, 58 insertions, 21 deletions
diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst
index 72c438c..687bc42 100644
--- a/doc/source/admin/rabbit.rst
+++ b/doc/source/admin/rabbit.rst
@@ -66,7 +66,8 @@ flag is used`_.
through the *Connection* class.
With mandatory flag RabbitMQ raises a callback if the message is not routed to
-any queue.
+any queue. This callback will be used to loop for a timeout and let's a chance
+to sender to recover.
.. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
.. _queues: https://www.rabbitmq.com/queues.html
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 011222c..cdc21c5 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -145,39 +145,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
while True:
try:
with self.listener.driver._get_connection(
- rpc_common.PURPOSE_SEND) as conn:
+ rpc_common.PURPOSE_SEND,
+ ) as conn:
self._send_reply(conn, reply, failure)
+
return
- except rpc_amqp.AMQPDestinationNotFound:
- if timer.check_return() > 0:
- LOG.debug(("The reply %(msg_id)s cannot be sent "
- "%(reply_q)s reply queue doesn't exist, "
- "retrying..."), {
- 'msg_id': self.msg_id,
- 'reply_q': self.reply_q})
- time.sleep(0.25)
- else:
+ except oslo_messaging.MessageUndeliverable:
+ # queue not found
+ if timer.check_return() <= 0:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
- infos = {
+ LOG.error(
+ 'The reply %(msg_id)s failed to send after '
+ '%(duration)d seconds due to a missing queue '
+ '(%(reply_q)s). Abandoning...', {
+ 'msg_id': self.msg_id,
+ 'duration': duration,
+ 'reply_q': self.reply_q})
+ return
+
+ LOG.debug(
+ 'The reply %(msg_id)s could not be sent due to a missing '
+ 'queue (%(reply_q)s). Retrying...', {
'msg_id': self.msg_id,
- 'reply_q': self.reply_q,
- 'duration': duration
- }
- LOG.info("The reply %(msg_id)s cannot be sent "
- "%(reply_q)s reply queue don't exist after "
- "%(duration)s sec abandoning...", infos)
+ 'reply_q': self.reply_q})
+ time.sleep(0.25)
+ except rpc_amqp.AMQPDestinationNotFound as exc:
+ # exchange not found/down
+ if timer.check_return() <= 0:
+ self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
+ LOG.error(
+ 'The reply %(msg_id)s failed to send after '
+ '%(duration)d seconds due to a broker issue '
+ '(%(exc)s). Abandoning...', {
+ 'msg_id': self.msg_id,
+ 'duration': duration,
+ 'exc': exc})
return
+ LOG.debug(
+ 'The reply %(msg_id)s could not be sent due to a broker '
+ 'issue (%(exc)s). Retrying...', {
+ 'msg_id': self.msg_id,
+ 'exc': exc})
+ time.sleep(0.25)
+
def heartbeat(self):
# generate a keep alive for RPC call monitoring
with self.listener.driver._get_connection(
- rpc_common.PURPOSE_SEND) as conn:
+ rpc_common.PURPOSE_SEND,
+ ) as conn:
try:
self._send_reply(conn, None, None, ending=False)
+ except oslo_messaging.MessageUndeliverable:
+ # internal exception that indicates queue gone -
+ # broker unreachable.
+ raise MessageDeliveryFailure(
+ "Heartbeat send failed. Missing queue")
except rpc_amqp.AMQPDestinationNotFound:
- # internal exception that indicates queue/exchange gone -
+ # internal exception that indicates exchange gone -
# broker unreachable.
- raise MessageDeliveryFailure("Heartbeat send failed")
+ raise MessageDeliveryFailure(
+ "Heartbeat send failed. Missing exchange")
# NOTE(sileht): Those have already be ack in RpcListener IO thread
# We keep them as noop until all drivers do the same
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index af0b550..45a4964 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -175,6 +175,8 @@ rabbit_opts = [
'flag for direct send. The direct send is used as reply, '
'so the MessageUndeliverable exception is raised '
'in case the client queue does not exist.'
+ 'MessageUndeliverable exception will be used to loop for a '
+ 'timeout to lets a chance to sender to recover.'
'This flag is deprecated and it will not be possible to '
'deactivate this functionality anymore'),
cfg.BoolOpt('enable_cancel_on_failover',
@@ -514,6 +516,7 @@ class Connection(object):
# if it was already monkey patched by eventlet/greenlet.
global threading
threading = stdlib_threading
+
self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
if self.ssl:
diff --git a/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml
new file mode 100644
index 0000000..0407e62
--- /dev/null
+++ b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml
@@ -0,0 +1,5 @@
+---
+features:
+ - |
+ Adding retry strategy based on the mandatory flag. Missing exchanges and
+ queues are now identified separately for logging purposes.