summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-07-08 13:49:10 +0000
committerGerrit Code Review <review@openstack.org>2022-07-08 13:49:10 +0000
commit1546eaadaa34cf57fd9f23fc85cfd0274a765533 (patch)
treed1d0efed76883fc5386a503762fcc748d5e563d3
parente05bb37ae2b91df0a8391fa1f3c6d91de24aa9f1 (diff)
parentb2acc6663f6c3f60e07cdeb1eae97fd1210a4d81 (diff)
downloadoslo-messaging-stable/stein.tar.gz
Merge "Cancel consumer if queue down" into stable/steinstable/stein
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py83
-rw-r--r--oslo_messaging/tests/functional/test_rabbitmq.py7
-rw-r--r--releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml6
3 files changed, 65 insertions, 31 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 122b166..8a3640a 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -154,6 +154,11 @@ rabbit_opts = [
default=2,
help='How often times during the heartbeat_timeout_threshold '
'we check the heartbeat.'),
+ cfg.BoolOpt('enable_cancel_on_failover',
+ default=False,
+ help="Enable x-cancel-on-ha-failover flag so that "
+ "rabbitmq server will cancel and notify consumers"
+ "when queue is down")
]
LOG = logging.getLogger(__name__)
@@ -215,7 +220,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
- nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
+ nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
+ enable_cancel_on_failover=False):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -237,10 +243,16 @@ class Consumer(object):
type=type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
+ self.enable_cancel_on_failover = enable_cancel_on_failover
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
+ consumer_arguments = None
+ if self.enable_cancel_on_failover:
+ consumer_arguments = {
+ "x-cancel-on-ha-failover": True}
+
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
@@ -248,7 +260,9 @@ class Consumer(object):
durable=self.durable,
auto_delete=self.queue_auto_delete,
routing_key=self.routing_key,
- queue_arguments=self.queue_arguments)
+ queue_arguments=self.queue_arguments,
+ consumer_arguments=consumer_arguments
+ )
try:
LOG.debug('[%s] Queue.declare: %s',
@@ -451,6 +465,7 @@ class Connection(object):
driver_conf.kombu_missing_consumer_retry_timeout
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
+ self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
if self.ssl:
self.ssl_version = driver_conf.ssl_version
@@ -1065,31 +1080,35 @@ class Connection(object):
"""
# TODO(obondarev): use default exchange since T release
- consumer = Consumer(exchange_name=topic,
- queue_name=topic,
- routing_key=topic,
- type='direct',
- durable=False,
- exchange_auto_delete=True,
- queue_auto_delete=False,
- callback=callback,
- rabbit_ha_queues=self.rabbit_ha_queues,
- rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
+ consumer = Consumer(
+ exchange_name=topic,
+ queue_name=topic,
+ routing_key=topic,
+ type='direct',
+ durable=False,
+ exchange_auto_delete=True,
+ queue_auto_delete=False,
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues,
+ rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
+ enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
"""Create a 'topic' consumer."""
- consumer = Consumer(exchange_name=exchange_name,
- queue_name=queue_name or topic,
- routing_key=topic,
- type='topic',
- durable=self.amqp_durable_queues,
- exchange_auto_delete=self.amqp_auto_delete,
- queue_auto_delete=self.amqp_auto_delete,
- callback=callback,
- rabbit_ha_queues=self.rabbit_ha_queues)
+ consumer = Consumer(
+ exchange_name=exchange_name,
+ queue_name=queue_name or topic,
+ routing_key=topic,
+ type='topic',
+ durable=self.amqp_durable_queues,
+ exchange_auto_delete=self.amqp_auto_delete,
+ queue_auto_delete=self.amqp_auto_delete,
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues,
+ enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
@@ -1100,16 +1119,18 @@ class Connection(object):
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
- consumer = Consumer(exchange_name=exchange_name,
- queue_name=queue_name,
- routing_key=topic,
- type='fanout',
- durable=False,
- exchange_auto_delete=True,
- queue_auto_delete=False,
- callback=callback,
- rabbit_ha_queues=self.rabbit_ha_queues,
- rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
+ consumer = Consumer(
+ exchange_name=exchange_name,
+ queue_name=queue_name,
+ routing_key=topic,
+ type='fanout',
+ durable=False,
+ exchange_auto_delete=True,
+ queue_auto_delete=False,
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues,
+ rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
+ enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py
index db06d01..84f84e8 100644
--- a/oslo_messaging/tests/functional/test_rabbitmq.py
+++ b/oslo_messaging/tests/functional/test_rabbitmq.py
@@ -39,6 +39,12 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
]
def test_failover_scenario(self):
+ self._test_failover_scenario()
+
+ def test_failover_scenario_enable_cancel_on_failover(self):
+ self._test_failover_scenario(enable_cancel_on_failover=True)
+
+ def _test_failover_scenario(self, enable_cancel_on_failover=False):
# NOTE(sileht): run this test only if functional suite run of a driver
# that use rabbitmq as backend
self.driver = os.environ.get('TRANSPORT_DRIVER')
@@ -53,6 +59,7 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
kombu_reconnect_delay=0,
rabbit_retry_interval=0,
rabbit_retry_backoff=0,
+ enable_cancel_on_failover=enable_cancel_on_failover,
group='oslo_messaging_rabbit')
self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True,
diff --git a/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml
new file mode 100644
index 0000000..affab65
--- /dev/null
+++ b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml
@@ -0,0 +1,6 @@
+---
+fixes:
+ - |
+ Add a new option `enable_cancel_on_failover` for rabbitmq driver
+ which when enabled, will cancel consumers when queue appears
+ to be down.