summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshenjiatong <yshxxsjt715@gmail.com>2020-07-03 15:51:21 +0800
committerHervé Beraud <hberaud@redhat.com>2021-03-15 13:16:36 +0100
commit1966f86be58c5267ce887dbc178283992a9eab3d (patch)
tree7bbb3c2d46afdf8f901a56f9a506d53a31e22c3a
parent56fc332608028ba4c671255e8bd9223a0bae762d (diff)
downloadoslo-messaging-1966f86be58c5267ce887dbc178283992a9eab3d.tar.gz
Cancel consumer if queue down
Previously, we have switched to use default exchanges to avoid excessive amounts of exchange not found messages. But it does not actually solve the problem because reply_* queue is already gone and agent will not receive callbacks. after some debugging, I found under some circumstances seems rabbitmq consumer does not receive basic cancel signal when queue is already gone. This might due to rabbitmq try to restart consumer when queue is down (for example when split brain). In such cases, it might be better to fail early. by reading the code, seems like x-cancel-on-ha-failover is not dedicated to mirror queues only, https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1894, https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1926. By failing early, in my own test setup, I could solve a certain case of exchange not found problem. Skipping functional failover's tests on stable/train. Conflicts: - oslo_messaging/_drivers/impl_rabbit.py (NOTE) hberaud: 56fc332608028ba4c671255e8bd9223a0bae762d is less older than these changes and it have been merged before on train. Conflicts are due to option type switch on `direct_mandatory_flag`. Changes: - oslo_messaging/tests/functional/test_rabbitmq.py (NOTE) hberaud: Skipping functional tests to avoid a SIGTERM issue with pifpaf [1] until the pifpaf version are updated in requirements [2]. This was decided during one of our previous meeting [3]. [1] https://github.com/jd/pifpaf/pull/124 [2] https://review.opendev.org/c/openstack/requirements/+/753239o (3] http://eavesdrop.openstack.org/meetings/oslo/2020/oslo.2020-11-16-15.00.log.html#l-28 Change-Id: I2ae53340783e4044dab58035bc0992dc08145b53 Related-bug: #1789177 (cherry picked from commit 196fa877a90d7eb0f82ec9e1c194eef3f98fc0b1) (cherry picked from commit 0a432c7fb107d04f7a41199fe9a8c4fbd344d009)
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py83
-rw-r--r--oslo_messaging/tests/functional/test_rabbitmq.py8
-rw-r--r--releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml6
3 files changed, 66 insertions, 31 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 3a50f99..bc98056 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -174,6 +174,11 @@ rabbit_opts = [
'for direct send. The direct send is used as reply, '
'so the MessageUndeliverable exception is raised '
'in case the client queue does not exist.'),
+ cfg.BoolOpt('enable_cancel_on_failover',
+ default=False,
+ help="Enable x-cancel-on-ha-failover flag so that "
+ "rabbitmq server will cancel and notify consumers"
+ "when queue is down")
]
LOG = logging.getLogger(__name__)
@@ -235,7 +240,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
- nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
+ nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
+ enable_cancel_on_failover=False):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@@ -257,10 +263,16 @@ class Consumer(object):
type=type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
+ self.enable_cancel_on_failover = enable_cancel_on_failover
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
+ consumer_arguments = None
+ if self.enable_cancel_on_failover:
+ consumer_arguments = {
+ "x-cancel-on-ha-failover": True}
+
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
@@ -268,7 +280,9 @@ class Consumer(object):
durable=self.durable,
auto_delete=self.queue_auto_delete,
routing_key=self.routing_key,
- queue_arguments=self.queue_arguments)
+ queue_arguments=self.queue_arguments,
+ consumer_arguments=consumer_arguments
+ )
try:
LOG.debug('[%s] Queue.declare: %s',
@@ -471,6 +485,7 @@ class Connection(object):
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
+ self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
if self.heartbeat_in_pthread:
# NOTE(hberaud): Experimental: threading module is in use to run
@@ -1120,31 +1135,35 @@ class Connection(object):
responses for call/multicall
"""
- consumer = Consumer(exchange_name='', # using default exchange
- queue_name=topic,
- routing_key='',
- type='direct',
- durable=False,
- exchange_auto_delete=False,
- queue_auto_delete=False,
- callback=callback,
- rabbit_ha_queues=self.rabbit_ha_queues,
- rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
+ consumer = Consumer(
+ exchange_name='', # using default exchange
+ queue_name=topic,
+ routing_key='',
+ type='direct',
+ durable=False,
+ exchange_auto_delete=False,
+ queue_auto_delete=False,
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues,
+ rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
+ enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
"""Create a 'topic' consumer."""
- consumer = Consumer(exchange_name=exchange_name,
- queue_name=queue_name or topic,
- routing_key=topic,
- type='topic',
- durable=self.amqp_durable_queues,
- exchange_auto_delete=self.amqp_auto_delete,
- queue_auto_delete=self.amqp_auto_delete,
- callback=callback,
- rabbit_ha_queues=self.rabbit_ha_queues)
+ consumer = Consumer(
+ exchange_name=exchange_name,
+ queue_name=queue_name or topic,
+ routing_key=topic,
+ type='topic',
+ durable=self.amqp_durable_queues,
+ exchange_auto_delete=self.amqp_auto_delete,
+ queue_auto_delete=self.amqp_auto_delete,
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues,
+ enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
@@ -1155,16 +1174,18 @@ class Connection(object):
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
- consumer = Consumer(exchange_name=exchange_name,
- queue_name=queue_name,
- routing_key=topic,
- type='fanout',
- durable=False,
- exchange_auto_delete=True,
- queue_auto_delete=False,
- callback=callback,
- rabbit_ha_queues=self.rabbit_ha_queues,
- rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
+ consumer = Consumer(
+ exchange_name=exchange_name,
+ queue_name=queue_name,
+ routing_key=topic,
+ type='fanout',
+ durable=False,
+ exchange_auto_delete=True,
+ queue_auto_delete=False,
+ callback=callback,
+ rabbit_ha_queues=self.rabbit_ha_queues,
+ rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
+ enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py
index ce84eda..3e5122a 100644
--- a/oslo_messaging/tests/functional/test_rabbitmq.py
+++ b/oslo_messaging/tests/functional/test_rabbitmq.py
@@ -39,6 +39,13 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
]
def test_failover_scenario(self):
+ self._test_failover_scenario()
+
+ def test_failover_scenario_enable_cancel_on_failover(self):
+ self.skipTest("Skipping failover tests on stable/train")
+ self._test_failover_scenario(enable_cancel_on_failover=True)
+
+ def _test_failover_scenario(self, enable_cancel_on_failover=False):
# NOTE(sileht): run this test only if functional suite run of a driver
# that use rabbitmq as backend
self.driver = os.environ.get('TRANSPORT_DRIVER')
@@ -54,6 +61,7 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
kombu_reconnect_delay=0,
rabbit_retry_interval=0,
rabbit_retry_backoff=0,
+ enable_cancel_on_failover=enable_cancel_on_failover,
group='oslo_messaging_rabbit')
self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True,
diff --git a/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml
new file mode 100644
index 0000000..affab65
--- /dev/null
+++ b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml
@@ -0,0 +1,6 @@
+---
+fixes:
+ - |
+ Add a new option `enable_cancel_on_failover` for rabbitmq driver
+ which when enabled, will cancel consumers when queue appears
+ to be down.