diff options
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 011222c..cdc21c5 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -145,39 +145,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): while True: try: with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: self._send_reply(conn, reply, failure) + return - except rpc_amqp.AMQPDestinationNotFound: - if timer.check_return() > 0: - LOG.debug(("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue doesn't exist, " - "retrying..."), { - 'msg_id': self.msg_id, - 'reply_q': self.reply_q}) - time.sleep(0.25) - else: + except oslo_messaging.MessageUndeliverable: + # queue not found + if timer.check_return() <= 0: self._obsolete_reply_queues.add(self.reply_q, self.msg_id) - infos = { + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a missing queue ' + '(%(reply_q)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'reply_q': self.reply_q}) + return + + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a missing ' + 'queue (%(reply_q)s). Retrying...', { 'msg_id': self.msg_id, - 'reply_q': self.reply_q, - 'duration': duration - } - LOG.info("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue don't exist after " - "%(duration)s sec abandoning...", infos) + 'reply_q': self.reply_q}) + time.sleep(0.25) + except rpc_amqp.AMQPDestinationNotFound as exc: + # exchange not found/down + if timer.check_return() <= 0: + self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a broker issue ' + '(%(exc)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'exc': exc}) return + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a broker ' + 'issue (%(exc)s). Retrying...', { + 'msg_id': self.msg_id, + 'exc': exc}) + time.sleep(0.25) + def heartbeat(self): # generate a keep alive for RPC call monitoring with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: try: self._send_reply(conn, None, None, ending=False) + except oslo_messaging.MessageUndeliverable: + # internal exception that indicates queue gone - + # broker unreachable. + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing queue") except rpc_amqp.AMQPDestinationNotFound: - # internal exception that indicates queue/exchange gone - + # internal exception that indicates exchange gone - # broker unreachable. - raise MessageDeliveryFailure("Heartbeat send failed") + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing exchange") # NOTE(sileht): Those have already be ack in RpcListener IO thread # We keep them as noop until all drivers do the same |