diff options
author | Zuul <zuul@review.opendev.org> | 2022-11-16 09:27:05 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2022-11-16 09:27:05 +0000 |
commit | b3c666ff34be908c85bdd3e6cd1a5fdf199333f0 (patch) | |
tree | 01a8dc5f171eeafcaeefe348f481ab50e7324f44 | |
parent | e5e70a5d894b576b3de705c5f55f9704bb8e593e (diff) | |
parent | 1fd461647f7f727dad9d4603abf0defe339d320f (diff) | |
download | oslo-messaging-b3c666ff34be908c85bdd3e6cd1a5fdf199333f0.tar.gz |
Merge "Force creating non durable control exchange when a precondition failed"
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 62 | ||||
-rw-r--r-- | releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml | 5 |
2 files changed, 64 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 4660762..12bf82c 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -349,11 +349,44 @@ class Consumer(object): self._declared_on = None self.exchange = kombu.entity.Exchange( name=exchange_name, - type=type, + type=self.type, durable=self.durable, auto_delete=self.exchange_auto_delete) self.enable_cancel_on_failover = enable_cancel_on_failover + def _declare_fallback(self, err, conn, consumer_arguments): + """Fallback by declaring a non durable queue. + + When a control exchange is shared between services it is possible + that some service created first a non durable control exchange and + then after that an other service can try to create the same control + exchange but as a durable control exchange. In this case RabbitMQ + will raise an exception (PreconditionFailed), and then it will stop + our execution and our service will fail entirly. In this case we want + to fallback by creating a non durable queue to match the default + config. + """ + if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err): + LOG.info( + "[%s] Retrying to declare the exchange (%s) as " + "non durable", conn.connection_id, self.exchange_name) + self.exchange = kombu.entity.Exchange( + name=self.exchange_name, + type=self.type, + durable=False, + auto_delete=self.queue_auto_delete) + self.queue = kombu.entity.Queue( + name=self.queue_name, + channel=conn.channel, + exchange=self.exchange, + durable=False, + auto_delete=self.queue_auto_delete, + routing_key=self.routing_key, + queue_arguments=self.queue_arguments, + consumer_arguments=consumer_arguments + ) + self.queue.declare() + def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" @@ -376,7 +409,18 @@ class Consumer(object): try: LOG.debug('[%s] Queue.declare: %s', conn.connection_id, self.queue_name) - self.queue.declare() + try: + self.queue.declare() + except amqp_exec.PreconditionFailed as err: + # NOTE(hberaud): This kind of exception may be triggered + # when a control exchange is shared between services and + # when services try to create it with configs that differ + # from each others. RabbitMQ will reject the services + # that try to create it with a configuration that differ + # from the one used first. + LOG.warning(err) + self._declare_fallback(err, conn, consumer_arguments) + except conn.connection.channel_errors as exc: # NOTE(jrosenboom): This exception may be triggered by a race # condition. Simply retrying will solve the error most of the time @@ -1354,7 +1398,19 @@ class Connection(object): """Publish a message.""" if not (exchange.passive or exchange.name in self._declared_exchanges): - exchange(self.channel).declare() + try: + exchange(self.channel).declare() + except amqp_exec.PreconditionFailed as err: + # NOTE(hberaud): This kind of exception may be triggered + # when a control exchange is shared between services and + # when services try to create it with configs that differ + # from each others. RabbitMQ will reject the services + # that try to create it with a configuration that differ + # from the one used first. + if "PRECONDITION_FAILED - inequivalent arg 'durable'" \ + in str(err): + exchange.durable = False + exchange(self.channel).declare() self._declared_exchanges.add(exchange.name) log_info = {'msg': msg, diff --git a/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml new file mode 100644 index 0000000..985fc64 --- /dev/null +++ b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Force creating non durable control exchange when a precondition failed + related to config that differ occuring. |