summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-11-16 09:27:05 +0000
committerGerrit Code Review <review@openstack.org>2022-11-16 09:27:05 +0000
commitb3c666ff34be908c85bdd3e6cd1a5fdf199333f0 (patch)
tree01a8dc5f171eeafcaeefe348f481ab50e7324f44
parente5e70a5d894b576b3de705c5f55f9704bb8e593e (diff)
parent1fd461647f7f727dad9d4603abf0defe339d320f (diff)
downloadoslo-messaging-b3c666ff34be908c85bdd3e6cd1a5fdf199333f0.tar.gz
Merge "Force creating non durable control exchange when a precondition failed"
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py62
-rw-r--r--releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml5
2 files changed, 64 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 4660762..12bf82c 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -349,11 +349,44 @@ class Consumer(object):
self._declared_on = None
self.exchange = kombu.entity.Exchange(
name=exchange_name,
- type=type,
+ type=self.type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
self.enable_cancel_on_failover = enable_cancel_on_failover
+ def _declare_fallback(self, err, conn, consumer_arguments):
+ """Fallback by declaring a non durable queue.
+
+ When a control exchange is shared between services it is possible
+ that some service created first a non durable control exchange and
+ then after that an other service can try to create the same control
+ exchange but as a durable control exchange. In this case RabbitMQ
+ will raise an exception (PreconditionFailed), and then it will stop
+ our execution and our service will fail entirly. In this case we want
+ to fallback by creating a non durable queue to match the default
+ config.
+ """
+ if "PRECONDITION_FAILED - inequivalent arg 'durable'" in str(err):
+ LOG.info(
+ "[%s] Retrying to declare the exchange (%s) as "
+ "non durable", conn.connection_id, self.exchange_name)
+ self.exchange = kombu.entity.Exchange(
+ name=self.exchange_name,
+ type=self.type,
+ durable=False,
+ auto_delete=self.queue_auto_delete)
+ self.queue = kombu.entity.Queue(
+ name=self.queue_name,
+ channel=conn.channel,
+ exchange=self.exchange,
+ durable=False,
+ auto_delete=self.queue_auto_delete,
+ routing_key=self.routing_key,
+ queue_arguments=self.queue_arguments,
+ consumer_arguments=consumer_arguments
+ )
+ self.queue.declare()
+
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
@@ -376,7 +409,18 @@ class Consumer(object):
try:
LOG.debug('[%s] Queue.declare: %s',
conn.connection_id, self.queue_name)
- self.queue.declare()
+ try:
+ self.queue.declare()
+ except amqp_exec.PreconditionFailed as err:
+ # NOTE(hberaud): This kind of exception may be triggered
+ # when a control exchange is shared between services and
+ # when services try to create it with configs that differ
+ # from each others. RabbitMQ will reject the services
+ # that try to create it with a configuration that differ
+ # from the one used first.
+ LOG.warning(err)
+ self._declare_fallback(err, conn, consumer_arguments)
+
except conn.connection.channel_errors as exc:
# NOTE(jrosenboom): This exception may be triggered by a race
# condition. Simply retrying will solve the error most of the time
@@ -1354,7 +1398,19 @@ class Connection(object):
"""Publish a message."""
if not (exchange.passive or exchange.name in self._declared_exchanges):
- exchange(self.channel).declare()
+ try:
+ exchange(self.channel).declare()
+ except amqp_exec.PreconditionFailed as err:
+ # NOTE(hberaud): This kind of exception may be triggered
+ # when a control exchange is shared between services and
+ # when services try to create it with configs that differ
+ # from each others. RabbitMQ will reject the services
+ # that try to create it with a configuration that differ
+ # from the one used first.
+ if "PRECONDITION_FAILED - inequivalent arg 'durable'" \
+ in str(err):
+ exchange.durable = False
+ exchange(self.channel).declare()
self._declared_exchanges.add(exchange.name)
log_info = {'msg': msg,
diff --git a/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml
new file mode 100644
index 0000000..985fc64
--- /dev/null
+++ b/releasenotes/notes/declare_fallback_durable_exchange-0db677de4fdf1e78.yaml
@@ -0,0 +1,5 @@
+---
+fixes:
+ - |
+ Force creating non durable control exchange when a precondition failed
+ related to config that differ occuring.