summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py21
-rw-r--r--oslo_messaging/_drivers/common.py4
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py11
-rw-r--r--oslo_messaging/_drivers/pool.py10
-rw-r--r--oslo_messaging/notify/messaging.py23
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py79
-rw-r--r--oslo_messaging/tests/drivers/test_pool.py4
-rw-r--r--oslo_messaging/tests/notify/test_notifier.py75
-rw-r--r--releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml8
9 files changed, 212 insertions, 23 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index cdc21c5..991bf46 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object):
class AMQPListener(base.PollStyleListener):
+ use_cache = False
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver.prefetch_size)
@@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener):
def __call__(self, message):
ctxt = rpc_amqp.unpack_context(message)
- unique_id = self.msg_id_cache.check_duplicate_message(message)
+ try:
+ unique_id = self.msg_id_cache.check_duplicate_message(message)
+ except rpc_common.DuplicateMessageError:
+ LOG.exception("ignoring duplicate message %s", ctxt.msg_id)
+ return
+ if self.use_cache:
+ self.msg_id_cache.add(unique_id)
if ctxt.msg_id:
LOG.debug("received message msg_id: %(msg_id)s reply to "
"%(queue)s", {'queue': ctxt.reply_q,
@@ -351,7 +358,7 @@ class AMQPListener(base.PollStyleListener):
self.conn.consume(timeout=min(self._current_timeout, left))
except rpc_common.Timeout:
LOG.debug("AMQPListener connection timeout")
- self._current_timeout = max(self._current_timeout * 2,
+ self._current_timeout = min(self._current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
@@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener):
class RpcAMQPListener(AMQPListener):
message_cls = AMQPIncomingMessage
+ use_cache = True
def __call__(self, message):
# NOTE(kgiusti): In the original RPC implementation the RPC server
@@ -490,7 +498,7 @@ class ReplyWaiter(object):
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
except rpc_common.Timeout:
- current_timeout = max(current_timeout * 2,
+ current_timeout = min(current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
except Exception:
LOG.exception("Failed to process incoming message, retrying..")
@@ -593,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
- def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
+ def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
return rpc_common.ConnectionContext(self._connection_pool,
- purpose=purpose)
+ purpose=purpose,
+ retry=retry)
def _get_reply_q(self):
with self._reply_q_lock:
@@ -641,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
- with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
+ with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
if notify:
exchange = self._get_exchange(target)
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py
index 54c6f7f..b6c3adb 100644
--- a/oslo_messaging/_drivers/common.py
+++ b/oslo_messaging/_drivers/common.py
@@ -392,7 +392,7 @@ class ConnectionContext(Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, purpose):
+ def __init__(self, connection_pool, purpose, retry):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
@@ -420,7 +420,7 @@ class ConnectionContext(Connection):
pooled = purpose == PURPOSE_SEND
if pooled:
- self.connection = connection_pool.get()
+ self.connection = connection_pool.get(retry=retry)
else:
self.connection = connection_pool.create(purpose)
self.pooled = pooled
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 9d99822..d603f89 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -465,13 +465,14 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object):
"""Connection object."""
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
+ self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
@@ -741,7 +742,13 @@ class Connection(object):
str(exc), interval)
self._set_current_channel(None)
- self.connection.ensure_connection(errback=on_error)
+ self.connection.ensure_connection(
+ errback=on_error,
+ max_retries=self.max_retries,
+ interval_start=self.interval_start or 1,
+ interval_step=self.interval_stepping,
+ interval_max=self.interval_max,
+ )
self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout()
diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py
index 8090e8d..9e5288d 100644
--- a/oslo_messaging/_drivers/pool.py
+++ b/oslo_messaging/_drivers/pool.py
@@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta):
self._items.append((ttl_watch, item))
self._cond.notify()
- def get(self):
+ def get(self, retry=None):
"""Return an item from the pool, when one is available.
This may cause the calling thread to block.
@@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta):
# We've grabbed a slot and dropped the lock, now do the creation
try:
- return self.create()
+ return self.create(retry=retry)
except Exception:
with self._cond:
self._current_size -= 1
@@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta):
return
@abc.abstractmethod
- def create(self):
+ def create(self, retry=None):
"""Construct a new item."""
@@ -130,9 +130,9 @@ class ConnectionPool(Pool):
LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items))
- def create(self, purpose=common.PURPOSE_SEND):
+ def create(self, purpose=common.PURPOSE_SEND, retry=None):
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url, purpose)
+ return self.connection_cls(self.conf, self.url, purpose, retry=retry)
def empty(self):
for item in self.iter_free():
diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py
index 61c7357..da633d8 100644
--- a/oslo_messaging/notify/messaging.py
+++ b/oslo_messaging/notify/messaging.py
@@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging.
The messaging drivers publish notification messages to notification
listeners.
-The driver will block the notifier's thread until the notification message has
-been passed to the messaging transport. There is no guarantee that the
-notification message will be consumed by a notification listener.
+In case of the rabbit backend the driver will block the notifier's thread
+until the notification message has been passed to the messaging transport.
+There is no guarantee that the notification message will be consumed by a
+notification listener.
+
+In case of the kafka backend the driver will not block the notifier's thread
+but return immediately. The driver will try to deliver the message in the
+background.
Notification messages are sent 'at-most-once' - ensuring that they are not
duplicated.
If the connection to the messaging service is not active when a notification is
-sent this driver will block waiting for the connection to complete. If the
-connection fails to complete, the driver will try to re-establish that
+sent the rabbit backend will block waiting for the connection to complete.
+If the connection fails to complete, the driver will try to re-establish that
connection. By default this will continue indefinitely until the connection
completes. However, the retry parameter can be used to have the notification
-send fail with a MessageDeliveryFailure after the given number of retries.
+send fail. In this case an error is logged and the notifier's thread is resumed
+without any error.
+
+If the connection to the messaging service is not active when a notification is
+sent the kafka backend will return immediately and the backend tries to
+establish the connection and deliver the messages in the background.
+
"""
import logging
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index e035150..8955661 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -1080,3 +1080,82 @@ class ConnectionLockTestCase(test_utils.BaseTestCase):
t2 = self._thread(lock, 1)
self.assertAlmostEqual(1, t1(), places=0)
self.assertAlmostEqual(2, t2(), places=0)
+
+
+class TestPollTimeoutLimit(test_utils.BaseTestCase):
+ def test_poll_timeout_limit(self):
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+ driver = transport._driver
+ target = oslo_messaging.Target(topic='testtopic')
+ listener = driver.listen(target, None, None)._poll_style_listener
+
+ thread = threading.Thread(target=listener.poll)
+ thread.daemon = True
+ thread.start()
+ time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2)
+
+ try:
+ # timeout should not grow past the maximum
+ self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX,
+ listener._current_timeout)
+
+ finally:
+ # gracefully stop waiting
+ driver.send(target,
+ {},
+ {'tx_id': 'test'})
+ thread.join()
+
+
+class TestMsgIdCache(test_utils.BaseTestCase):
+ @mock.patch('kombu.message.Message.reject')
+ def test_reply_wire_format(self, reject_mock):
+ self.conf.oslo_messaging_rabbit.kombu_compression = None
+
+ transport = oslo_messaging.get_transport(self.conf,
+ 'kombu+memory:////')
+ self.addCleanup(transport.cleanup)
+
+ driver = transport._driver
+
+ target = oslo_messaging.Target(topic='testtopic',
+ server=None,
+ fanout=False)
+
+ listener = driver.listen(target, None, None)._poll_style_listener
+
+ connection, producer = _create_producer(target)
+ self.addCleanup(connection.release)
+
+ msg = {
+ 'oslo.version': '2.0',
+ 'oslo.message': {}
+ }
+
+ msg['oslo.message'].update({
+ '_msg_id': uuid.uuid4().hex,
+ '_unique_id': uuid.uuid4().hex,
+ '_reply_q': 'reply_' + uuid.uuid4().hex,
+ '_timeout': None,
+ })
+
+ msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
+
+ producer.publish(msg)
+
+ received = listener.poll()[0]
+ self.assertIsNotNone(received)
+ self.assertEqual({}, received.message)
+
+ # publish the same message a second time
+ producer.publish(msg)
+
+ received = listener.poll(timeout=1)
+
+ # duplicate message is ignored
+ self.assertEqual(len(received), 0)
+
+ # we should not reject duplicate message
+ reject_mock.assert_not_called()
diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py
index d5c6420..82a10e1 100644
--- a/oslo_messaging/tests/drivers/test_pool.py
+++ b/oslo_messaging/tests/drivers/test_pool.py
@@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase):
class TestPool(pool.Pool):
- def create(self):
+ def create(self, retry=None):
return uuid.uuid4()
class ThreadWaitWaiter(object):
@@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase):
p = self.TestPool(**kwargs)
if self.create_error:
- def create_error():
+ def create_error(retry=None):
raise RuntimeError
orig_create = p.create
self.useFixture(fixtures.MockPatchObject(
diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py
index c36a432..330bdab 100644
--- a/oslo_messaging/tests/notify/test_notifier.py
+++ b/oslo_messaging/tests/notify/test_notifier.py
@@ -18,6 +18,7 @@ import sys
import uuid
import fixtures
+from kombu import connection
from oslo_serialization import jsonutils
from oslo_utils import strutils
from oslo_utils import timeutils
@@ -228,6 +229,80 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
TestMessagingNotifier.generate_scenarios()
+class TestMessagingNotifierRetry(test_utils.BaseTestCase):
+
+ class TestingException(BaseException):
+ pass
+
+ def test_notifier_retry_connection_fails_rabbit(self):
+ """This test sets a small retry number for notification sending and
+ configures a non reachable message bus. The expectation that after the
+ configured number of retries the driver gives up the message sending.
+ """
+ self.config(
+ driver=["messagingv2"],
+ topics=["test-retry"],
+ retry=2,
+ group="oslo_messaging_notifications")
+ self.config(
+ # just to speed up the test execution
+ rabbit_retry_backoff=0,
+ group="oslo_messaging_rabbit")
+ transport = oslo_messaging.get_notification_transport(
+ self.conf, url='rabbit://')
+ notifier = oslo_messaging.Notifier(transport)
+
+ orig_establish_connection = connection.Connection._establish_connection
+ calls = []
+
+ def wrapped_establish_connection(*args, **kwargs):
+ if len(calls) > 2:
+ raise self.TestingException(
+ "Connection should only be retried twice due to "
+ "configuration")
+ else:
+ calls.append((args, kwargs))
+ orig_establish_connection(*args, **kwargs)
+
+ with mock.patch(
+ 'kombu.connection.Connection._establish_connection',
+ new=wrapped_establish_connection
+ ):
+ with mock.patch(
+ 'oslo_messaging.notify.messaging.LOG.exception'
+ ) as mock_log:
+ notifier.info({}, "test", {})
+
+ # one normal call plus two retries
+ self.assertEqual(3, len(calls))
+ # the error was caught and logged
+ mock_log.assert_called_once()
+
+ def test_notifier_retry_connection_fails_kafka(self):
+ """This test sets a small retry number for notification sending and
+ configures a non reachable message bus. The expectation that after the
+ configured number of retries the driver gives up the message sending.
+ """
+
+ self.config(
+ driver=["messagingv2"],
+ topics=["test-retry"],
+ retry=2,
+ group='oslo_messaging_notifications')
+
+ transport = oslo_messaging.get_notification_transport(
+ self.conf, url='kafka://')
+
+ notifier = oslo_messaging.Notifier(transport)
+
+ # Kafka's message producer interface is async, and there is no way
+ # from the oslo interface to force sending a pending message. So this
+ # call simply returns without i) failing to deliver the message to
+ # the non existent kafka bus ii) retrying the message delivery twice
+ # as the configuration requested it.
+ notifier.info({}, "test", {})
+
+
class TestSerializer(test_utils.BaseTestCase):
def setUp(self):
diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
new file mode 100644
index 0000000..d3d62cb
--- /dev/null
+++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml
@@ -0,0 +1,8 @@
+---
+fixes:
+ - |
+ As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit
+ backend is changed to use the ``[oslo_messaging_notifications]retry``
+ parameter when driver tries to connect to the message bus during
+ notification sending. Before this fix the rabbit backend retried the
+ connection forever blocking the caller thread.