From 1fd461647f7f727dad9d4603abf0defe339d320f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Herv=C3=A9=20Beraud?= Date: Tue, 14 Dec 2021 15:58:34 +0100 Subject: Force creating non durable control exchange when a precondition failed Precondition failed exception related to durable exchange config may be triggered when a control exchange is shared between services and when services try to create it with configs that differ from each others. RabbitMQ will reject the services that try to create it with a configuration that differ from the one used first. This kind of exception is not managed for now and services can fails without handling this kind of issue. These changes catch this kind exception to analyze if they related to durable config. In this case we try to re-declare the failing exchange/queue as non durable. This problem can be easily reproduced by running a local RabbitMQ server. By setting the config below (sample.conf): ``` [DEFAULT] transport_url = rabbit://localhost/ [OSLO_MESSAGING_RABBIT] amqp_durable_queues = true ``` And by running our simulator twice: ``` $ tox -e venv -- python tools/simulator.py -d rpc-server -w 40 $ tox -e venv -- python tools/simulator.py --config-file ./sample.conf -d rpc-server -w 40 ``` The first one will create a default non durable control exchange. The second one will create the same default control exchange but as durable. Closes-Bug: #1953351 Change-Id: I27625b468c428cde6609730c8ab429c2c112d010 --- oslo_messaging/_drivers/impl_rabbit.py | 62 ++++++++++++++++++++-- ...fallback_durable_exchange-0db677de4fdf1e78.yaml | 5 ++ 2 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 20e30ad..a1f5ce2 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -254,11 +254,44 @@ class Consumer(object): self._declared_on = None self.exchange = kombu.entity.Exchange( name=exchange_name, - type=type, + type=self.type, durable=self.durable, auto_delete=self.exchange_auto_delete) self.enable_cancel_on_failover = enable_cancel_on_failover + def _declare_fallback(self, err, conn, consumer_arguments): + """Fallback by declaring a non durable queue. + + When a control exchange is shared between services it is possible + that some service created first a non durable control exchange and + then after that an other service can try to create the same control + exchange but as a durable control exchange. In this case RabbitMQ + will raise an exception (PreconditionFailed), and then it will stop + our execution and our service will fail entirly. In this case we want + to fallback by creating a non durable queue to match the default + config. + """ + if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err): + LOG.info( + "[%s] Retrying to declare the exchange (%s) as " + "non durable", conn.connection_id, self.exchange_name) + self.exchange = kombu.entity.Exchange( + name=self.exchange_name, + type=self.type, + durable=False, + auto_delete=self.queue_auto_delete) + self.queue = kombu.entity.Queue( + name=self.queue_name, + channel=conn.channel, + exchange=self.exchange, + durable=False, + auto_delete=self.queue_auto_delete, + routing_key=self.routing_key, + queue_arguments=self.queue_arguments, + consumer_arguments=consumer_arguments + ) + self.queue.declare() + def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" @@ -281,7 +314,18 @@ class Consumer(object): try: LOG.debug('[%s] Queue.declare: %s', conn.connection_id, self.queue_name) - self.queue.declare() + try: + self.queue.declare() + except amqp_exec.PreconditionFailed as err: + # NOTE(hberaud): This kind of exception may be triggered + # when a control exchange is shared between services and + # when services try to create it with configs that differ + # from each others. RabbitMQ will reject the services + # that try to create it with a configuration that differ + # from the one used first. + LOG.warning(err) + self._declare_fallback(err, conn, consumer_arguments) + except conn.connection.channel_errors as exc: # NOTE(jrosenboom): This exception may be triggered by a race # condition. Simply retrying will solve the error most of the time @@ -1219,7 +1263,19 @@ class Connection(object): """Publish a message.""" if not (exchange.passive or exchange.name in self._declared_exchanges): - exchange(self.channel).declare() + try: + exchange(self.channel).declare() + except amqp_exec.PreconditionFailed as err: + # NOTE(hberaud): This kind of exception may be triggered + # when a control exchange is shared between services and + # when services try to create it with configs that differ + # from each others. RabbitMQ will reject the services + # that try to create it with a configuration that differ + # from the one used first. + if "PRECONDITION_FAILED - inequivalent arg 'durable'" \ + in str(err): + exchange.durable = False + exchange(self.channel).declare() self._declared_exchanges.add(exchange.name) log_info = {'msg': msg, diff --git a/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml new file mode 100644 index 0000000..985fc64 --- /dev/null +++ b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Force creating non durable control exchange when a precondition failed + related to config that differ occuring. -- cgit v1.2.1