summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBalazs Gibizer <balazs.gibizer@est.tech>2021-11-23 16:58:05 +0100
committerBalazs Gibizer <balazs.gibizer@est.tech>2022-02-11 13:20:12 +0100
commit5d6fd1a176a47ffdc55223b990c466917ded9449 (patch)
tree76371196865940cccca3a7a315bd1c91317af63d
parentd63173a31f500254277641a76bb721a8bf07ad9c (diff)
downloadoslo-messaging-stable/wallaby.tar.gz
[rabbit] use retry parameters during notification sendingwallaby-em12.7.3stable/wallaby
The rabbit backend now applies the [oslo_messaging_notifications]retry, [oslo_messaging_rabbit]rabbit_retry_interval, rabbit_retry_backoff and rabbit_interval_max configuration parameters when tries to establish the connection to the message bus during notification sending. This patch also clarifies the differences between the behavior of the kafka and the rabbit drivers in this regard. Closes-Bug: #1917645 Change-Id: Id4ccafc95314c86ae918336e42cca64a6acd4d94 (cherry picked from commit 7b3968d9b012e873a9b393fcefa578c46fca18c6) (cherry picked from commit 3b5a0543e97619ca8f8cf98193f6b6375d77cbf2)
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py7
-rw-r--r--oslo_messaging/_drivers/common.py4
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py11
-rw-r--r--oslo_messaging/_drivers/pool.py10
-rw-r--r--oslo_messaging/notify/messaging.py23
-rw-r--r--oslo_messaging/tests/drivers/test_pool.py4
-rw-r--r--oslo_messaging/tests/notify/test_notifier.py19
-rw-r--r--releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml8
8 files changed, 60 insertions, 26 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index cdc21c5..6dc4f0f 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -593,9 +593,10 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
- def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
+ def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
return rpc_common.ConnectionContext(self._connection_pool,
- purpose=purpose)
+ purpose=purpose,
+ retry=retry)
def _get_reply_q(self):
with self._reply_q_lock:
@@ -641,7 +642,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
- with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
+ with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
if notify:
exchange = self._get_exchange(target)
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py
index 54c6f7f..b6c3adb 100644
--- a/oslo_messaging/_drivers/common.py
+++ b/oslo_messaging/_drivers/common.py
@@ -392,7 +392,7 @@ class ConnectionContext(Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, purpose):
+ def __init__(self, connection_pool, purpose, retry):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
@@ -420,7 +420,7 @@ class ConnectionContext(Connection):
pooled = purpose == PURPOSE_SEND
if pooled:
- self.connection = connection_pool.get()
+ self.connection = connection_pool.get(retry=retry)
else:
self.connection = connection_pool.create(purpose)
self.pooled = pooled
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 9d99822..d603f89 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -465,13 +465,14 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object):
"""Connection object."""
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
+ self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
@@ -741,7 +742,13 @@ class Connection(object):
str(exc), interval)
self._set_current_channel(None)
- self.connection.ensure_connection(errback=on_error)
+ self.connection.ensure_connection(
+ errback=on_error,
+ max_retries=self.max_retries,
+ interval_start=self.interval_start or 1,
+ interval_step=self.interval_stepping,
+ interval_max=self.interval_max,
+ )
self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout()
diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py
index 7d29b78..2353a7a 100644
--- a/oslo_messaging/_drivers/pool.py
+++ b/oslo_messaging/_drivers/pool.py
@@ -80,7 +80,7 @@ class Pool(object, metaclass=abc.ABCMeta):
self._items.append((ttl_watch, item))
self._cond.notify()
- def get(self):
+ def get(self, retry=None):
"""Return an item from the pool, when one is available.
This may cause the calling thread to block.
@@ -106,7 +106,7 @@ class Pool(object, metaclass=abc.ABCMeta):
# We've grabbed a slot and dropped the lock, now do the creation
try:
- return self.create()
+ return self.create(retry=retry)
except Exception:
with self._cond:
self._current_size -= 1
@@ -122,7 +122,7 @@ class Pool(object, metaclass=abc.ABCMeta):
return
@abc.abstractmethod
- def create(self):
+ def create(self, retry=None):
"""Construct a new item."""
@@ -141,9 +141,9 @@ class ConnectionPool(Pool):
LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items))
- def create(self, purpose=common.PURPOSE_SEND):
+ def create(self, purpose=common.PURPOSE_SEND, retry=None):
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url, purpose)
+ return self.connection_cls(self.conf, self.url, purpose, retry=retry)
def empty(self):
for item in self.iter_free():
diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py
index 61c7357..da633d8 100644
--- a/oslo_messaging/notify/messaging.py
+++ b/oslo_messaging/notify/messaging.py
@@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging.
The messaging drivers publish notification messages to notification
listeners.
-The driver will block the notifier's thread until the notification message has
-been passed to the messaging transport. There is no guarantee that the
-notification message will be consumed by a notification listener.
+In case of the rabbit backend the driver will block the notifier's thread
+until the notification message has been passed to the messaging transport.
+There is no guarantee that the notification message will be consumed by a
+notification listener.
+
+In case of the kafka backend the driver will not block the notifier's thread
+but return immediately. The driver will try to deliver the message in the
+background.
Notification messages are sent 'at-most-once' - ensuring that they are not
duplicated.
If the connection to the messaging service is not active when a notification is
-sent this driver will block waiting for the connection to complete. If the
-connection fails to complete, the driver will try to re-establish that
+sent the rabbit backend will block waiting for the connection to complete.
+If the connection fails to complete, the driver will try to re-establish that
connection. By default this will continue indefinitely until the connection
completes. However, the retry parameter can be used to have the notification
-send fail with a MessageDeliveryFailure after the given number of retries.
+send fail. In this case an error is logged and the notifier's thread is resumed
+without any error.
+
+If the connection to the messaging service is not active when a notification is
+sent the kafka backend will return immediately and the backend tries to
+establish the connection and deliver the messages in the background.
+
"""
import logging
diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py
index d5c6420..82a10e1 100644
--- a/oslo_messaging/tests/drivers/test_pool.py
+++ b/oslo_messaging/tests/drivers/test_pool.py
@@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase):
class TestPool(pool.Pool):
- def create(self):
+ def create(self, retry=None):
return uuid.uuid4()
class ThreadWaitWaiter(object):
@@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase):
p = self.TestPool(**kwargs)
if self.create_error:
- def create_error():
+ def create_error(retry=None):
raise RuntimeError
orig_create = p.create
self.useFixture(fixtures.MockPatchObject(
diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py
index d0a8eca..330bdab 100644
--- a/oslo_messaging/tests/notify/test_notifier.py
+++ b/oslo_messaging/tests/notify/test_notifier.py
@@ -244,6 +244,10 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
topics=["test-retry"],
retry=2,
group="oslo_messaging_notifications")
+ self.config(
+ # just to speed up the test execution
+ rabbit_retry_backoff=0,
+ group="oslo_messaging_rabbit")
transport = oslo_messaging.get_notification_transport(
self.conf, url='rabbit://')
notifier = oslo_messaging.Notifier(transport)
@@ -264,12 +268,15 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
'kombu.connection.Connection._establish_connection',
new=wrapped_establish_connection
):
- # FIXME(gibi) This is bug 1917645 as the driver does not stop
- # retrying the connection after two retries only our test fixture
- # stops the retry by raising TestingException
- self.assertRaises(
- self.TestingException,
- notifier.info, {}, "test", {})
+ with mock.patch(
+ 'oslo_messaging.notify.messaging.LOG.exception'
+ ) as mock_log:
+ notifier.info({}, "test", {})
+
+ # one normal call plus two retries
+ self.assertEqual(3, len(calls))
+ # the error was caught and logged
+ mock_log.assert_called_once()
def test_notifier_retry_connection_fails_kafka(self):
"""This test sets a small retry number for notification sending and
diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
new file mode 100644
index 0000000..d3d62cb
--- /dev/null
+++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
@@ -0,0 +1,8 @@
+---
+fixes:
+ - |
+ As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit
+ backend is changed to use the ``[oslo_messaging_notifications]retry``
+ parameter when driver tries to connect to the message bus during
+ notification sending. Before this fix the rabbit backend retried the
+ connection forever blocking the caller thread.