diff options
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 21 | ||||
-rw-r--r-- | oslo_messaging/_drivers/common.py | 4 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 11 | ||||
-rw-r--r-- | oslo_messaging/_drivers/pool.py | 10 | ||||
-rw-r--r-- | oslo_messaging/notify/messaging.py | 23 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 79 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_pool.py | 4 | ||||
-rw-r--r-- | oslo_messaging/tests/notify/test_notifier.py | 75 | ||||
-rw-r--r-- | releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml | 8 |
9 files changed, 212 insertions, 23 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index cdc21c5..991bf46 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object): class AMQPListener(base.PollStyleListener): + use_cache = False def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener): def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) - unique_id = self.msg_id_cache.check_duplicate_message(message) + try: + unique_id = self.msg_id_cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + LOG.exception("ignoring duplicate message %s", ctxt.msg_id) + return + if self.use_cache: + self.msg_id_cache.add(unique_id) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, @@ -351,7 +358,7 @@ class AMQPListener(base.PollStyleListener): self.conn.consume(timeout=min(self._current_timeout, left)) except rpc_common.Timeout: LOG.debug("AMQPListener connection timeout") - self._current_timeout = max(self._current_timeout * 2, + self._current_timeout = min(self._current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) else: self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN @@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener): class RpcAMQPListener(AMQPListener): message_cls = AMQPIncomingMessage + use_cache = True def __call__(self, message): # NOTE(kgiusti): In the original RPC implementation the RPC server @@ -490,7 +498,7 @@ class ReplyWaiter(object): # ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds self.conn.consume(timeout=current_timeout) except rpc_common.Timeout: - current_timeout = max(current_timeout * 2, + current_timeout = min(current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) except Exception: LOG.exception("Failed to process incoming message, retrying..") @@ -593,9 +601,10 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, purpose=rpc_common.PURPOSE_SEND): + def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None): return rpc_common.ConnectionContext(self._connection_pool, - purpose=purpose) + purpose=purpose, + retry=retry) def _get_reply_q(self): with self._reply_q_lock: @@ -641,7 +650,7 @@ class AMQPDriverBase(base.BaseDriver): log_msg = "CAST unique_id: %s " % unique_id try: - with self._get_connection(rpc_common.PURPOSE_SEND) as conn: + with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn: if notify: exchange = self._get_exchange(target) LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'" diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 54c6f7f..b6c3adb 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -392,7 +392,7 @@ class ConnectionContext(Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, purpose): + def __init__(self, connection_pool, purpose, retry): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool @@ -420,7 +420,7 @@ class ConnectionContext(Connection): pooled = purpose == PURPOSE_SEND if pooled: - self.connection = connection_pool.get() + self.connection = connection_pool.get(retry=retry) else: self.connection = connection_pool.create(purpose) self.pooled = pooled diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9d99822..d603f89 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -465,13 +465,14 @@ class ConnectionLock(DummyConnectionLock): class Connection(object): """Connection object.""" - def __init__(self, conf, url, purpose): + def __init__(self, conf, url, purpose, retry=None): # NOTE(viktors): Parse config options driver_conf = conf.oslo_messaging_rabbit self.interval_start = driver_conf.rabbit_retry_interval self.interval_stepping = driver_conf.rabbit_retry_backoff self.interval_max = driver_conf.rabbit_interval_max + self.max_retries = retry self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues @@ -741,7 +742,13 @@ class Connection(object): str(exc), interval) self._set_current_channel(None) - self.connection.ensure_connection(errback=on_error) + self.connection.ensure_connection( + errback=on_error, + max_retries=self.max_retries, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + interval_max=self.interval_max, + ) self._set_current_channel(self.connection.channel()) self.set_transport_socket_timeout() diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index 8090e8d..9e5288d 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -69,7 +69,7 @@ class Pool(object, metaclass=abc.ABCMeta): self._items.append((ttl_watch, item)) self._cond.notify() - def get(self): + def get(self, retry=None): """Return an item from the pool, when one is available. This may cause the calling thread to block. @@ -95,7 +95,7 @@ class Pool(object, metaclass=abc.ABCMeta): # We've grabbed a slot and dropped the lock, now do the creation try: - return self.create() + return self.create(retry=retry) except Exception: with self._cond: self._current_size -= 1 @@ -111,7 +111,7 @@ class Pool(object, metaclass=abc.ABCMeta): return @abc.abstractmethod - def create(self): + def create(self, retry=None): """Construct a new item.""" @@ -130,9 +130,9 @@ class ConnectionPool(Pool): LOG.debug("Idle connection has expired and been closed." " Pool size: %d" % len(self._items)) - def create(self, purpose=common.PURPOSE_SEND): + def create(self, purpose=common.PURPOSE_SEND, retry=None): LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url, purpose) + return self.connection_cls(self.conf, self.url, purpose, retry=retry) def empty(self): for item in self.iter_free(): diff --git a/oslo_messaging/notify/messaging.py b/oslo_messaging/notify/messaging.py index 61c7357..da633d8 100644 --- a/oslo_messaging/notify/messaging.py +++ b/oslo_messaging/notify/messaging.py @@ -21,19 +21,30 @@ Notification drivers for sending notifications via messaging. The messaging drivers publish notification messages to notification listeners. -The driver will block the notifier's thread until the notification message has -been passed to the messaging transport. There is no guarantee that the -notification message will be consumed by a notification listener. +In case of the rabbit backend the driver will block the notifier's thread +until the notification message has been passed to the messaging transport. +There is no guarantee that the notification message will be consumed by a +notification listener. + +In case of the kafka backend the driver will not block the notifier's thread +but return immediately. The driver will try to deliver the message in the +background. Notification messages are sent 'at-most-once' - ensuring that they are not duplicated. If the connection to the messaging service is not active when a notification is -sent this driver will block waiting for the connection to complete. If the -connection fails to complete, the driver will try to re-establish that +sent the rabbit backend will block waiting for the connection to complete. +If the connection fails to complete, the driver will try to re-establish that connection. By default this will continue indefinitely until the connection completes. However, the retry parameter can be used to have the notification -send fail with a MessageDeliveryFailure after the given number of retries. +send fail. In this case an error is logged and the notifier's thread is resumed +without any error. + +If the connection to the messaging service is not active when a notification is +sent the kafka backend will return immediately and the backend tries to +establish the connection and deliver the messages in the background. + """ import logging diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index e035150..8955661 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -1080,3 +1080,82 @@ class ConnectionLockTestCase(test_utils.BaseTestCase): t2 = self._thread(lock, 1) self.assertAlmostEqual(1, t1(), places=0) self.assertAlmostEqual(2, t2(), places=0) + + +class TestPollTimeoutLimit(test_utils.BaseTestCase): + def test_poll_timeout_limit(self): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + driver = transport._driver + target = oslo_messaging.Target(topic='testtopic') + listener = driver.listen(target, None, None)._poll_style_listener + + thread = threading.Thread(target=listener.poll) + thread.daemon = True + thread.start() + time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2) + + try: + # timeout should not grow past the maximum + self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX, + listener._current_timeout) + + finally: + # gracefully stop waiting + driver.send(target, + {}, + {'tx_id': 'test'}) + thread.join() + + +class TestMsgIdCache(test_utils.BaseTestCase): + @mock.patch('kombu.message.Message.reject') + def test_reply_wire_format(self, reject_mock): + self.conf.oslo_messaging_rabbit.kombu_compression = None + + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + + driver = transport._driver + + target = oslo_messaging.Target(topic='testtopic', + server=None, + fanout=False) + + listener = driver.listen(target, None, None)._poll_style_listener + + connection, producer = _create_producer(target) + self.addCleanup(connection.release) + + msg = { + 'oslo.version': '2.0', + 'oslo.message': {} + } + + msg['oslo.message'].update({ + '_msg_id': uuid.uuid4().hex, + '_unique_id': uuid.uuid4().hex, + '_reply_q': 'reply_' + uuid.uuid4().hex, + '_timeout': None, + }) + + msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) + + producer.publish(msg) + + received = listener.poll()[0] + self.assertIsNotNone(received) + self.assertEqual({}, received.message) + + # publish the same message a second time + producer.publish(msg) + + received = listener.poll(timeout=1) + + # duplicate message is ignored + self.assertEqual(len(received), 0) + + # we should not reject duplicate message + reject_mock.assert_not_called() diff --git a/oslo_messaging/tests/drivers/test_pool.py b/oslo_messaging/tests/drivers/test_pool.py index d5c6420..82a10e1 100644 --- a/oslo_messaging/tests/drivers/test_pool.py +++ b/oslo_messaging/tests/drivers/test_pool.py @@ -44,7 +44,7 @@ class PoolTestCase(test_utils.BaseTestCase): class TestPool(pool.Pool): - def create(self): + def create(self, retry=None): return uuid.uuid4() class ThreadWaitWaiter(object): @@ -82,7 +82,7 @@ class PoolTestCase(test_utils.BaseTestCase): p = self.TestPool(**kwargs) if self.create_error: - def create_error(): + def create_error(retry=None): raise RuntimeError orig_create = p.create self.useFixture(fixtures.MockPatchObject( diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py index c36a432..330bdab 100644 --- a/oslo_messaging/tests/notify/test_notifier.py +++ b/oslo_messaging/tests/notify/test_notifier.py @@ -18,6 +18,7 @@ import sys import uuid import fixtures +from kombu import connection from oslo_serialization import jsonutils from oslo_utils import strutils from oslo_utils import timeutils @@ -228,6 +229,80 @@ class TestMessagingNotifier(test_utils.BaseTestCase): TestMessagingNotifier.generate_scenarios() +class TestMessagingNotifierRetry(test_utils.BaseTestCase): + + class TestingException(BaseException): + pass + + def test_notifier_retry_connection_fails_rabbit(self): + """This test sets a small retry number for notification sending and + configures a non reachable message bus. The expectation that after the + configured number of retries the driver gives up the message sending. + """ + self.config( + driver=["messagingv2"], + topics=["test-retry"], + retry=2, + group="oslo_messaging_notifications") + self.config( + # just to speed up the test execution + rabbit_retry_backoff=0, + group="oslo_messaging_rabbit") + transport = oslo_messaging.get_notification_transport( + self.conf, url='rabbit://') + notifier = oslo_messaging.Notifier(transport) + + orig_establish_connection = connection.Connection._establish_connection + calls = [] + + def wrapped_establish_connection(*args, **kwargs): + if len(calls) > 2: + raise self.TestingException( + "Connection should only be retried twice due to " + "configuration") + else: + calls.append((args, kwargs)) + orig_establish_connection(*args, **kwargs) + + with mock.patch( + 'kombu.connection.Connection._establish_connection', + new=wrapped_establish_connection + ): + with mock.patch( + 'oslo_messaging.notify.messaging.LOG.exception' + ) as mock_log: + notifier.info({}, "test", {}) + + # one normal call plus two retries + self.assertEqual(3, len(calls)) + # the error was caught and logged + mock_log.assert_called_once() + + def test_notifier_retry_connection_fails_kafka(self): + """This test sets a small retry number for notification sending and + configures a non reachable message bus. The expectation that after the + configured number of retries the driver gives up the message sending. + """ + + self.config( + driver=["messagingv2"], + topics=["test-retry"], + retry=2, + group='oslo_messaging_notifications') + + transport = oslo_messaging.get_notification_transport( + self.conf, url='kafka://') + + notifier = oslo_messaging.Notifier(transport) + + # Kafka's message producer interface is async, and there is no way + # from the oslo interface to force sending a pending message. So this + # call simply returns without i) failing to deliver the message to + # the non existent kafka bus ii) retrying the message delivery twice + # as the configuration requested it. + notifier.info({}, "test", {}) + + class TestSerializer(test_utils.BaseTestCase): def setUp(self): diff --git a/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml new file mode 100644 index 0000000..d3d62cb --- /dev/null +++ b/releasenotes/notes/bug-1917645-rabbit-use-retry-parameter-for-notifications-3f7c508ab4437579.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + As a fix for `bug 1917645 <https://launchpad.net/bugs/1917645>`_ the rabbit + backend is changed to use the ``[oslo_messaging_notifications]retry`` + parameter when driver tries to connect to the message bus during + notification sending. Before this fix the rabbit backend retried the + connection forever blocking the caller thread. |