diff options
Diffstat (limited to 'oslo_messaging/tests/drivers/test_impl_kafka.py')
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_kafka.py | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 77b2ed6..5e78369 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -15,6 +15,8 @@ import testscenarios from unittest import mock +from confluent_kafka import KafkaException + import oslo_messaging from oslo_messaging._drivers import impl_kafka as kafka_driver from oslo_messaging.tests import utils as test_utils @@ -120,6 +122,36 @@ class TestKafkaDriver(test_utils.BaseTestCase): 'ssl.key.password': '', }) + def test_send_notification_retries_on_buffer_error(self): + target = oslo_messaging.Target(topic="topic_test") + + with mock.patch("confluent_kafka.Producer") as producer: + fake_producer = mock.MagicMock() + fake_producer.produce = mock.Mock( + side_effect=[BufferError, BufferError, None]) + producer.return_value = fake_producer + + self.driver.send_notification( + target, {}, {"payload": ["test_1"]}, + None, retry=3) + + assert fake_producer.produce.call_count == 3 + + def test_send_notification_stops_on_kafka_error(self): + target = oslo_messaging.Target(topic="topic_test") + + with mock.patch("confluent_kafka.Producer") as producer: + fake_producer = mock.MagicMock() + fake_producer.produce = mock.Mock( + side_effect=[KafkaException, None]) + producer.return_value = fake_producer + + self.driver.send_notification( + target, {}, {"payload": ["test_1"]}, + None, retry=3) + + assert fake_producer.produce.call_count == 1 + def test_listen(self): target = oslo_messaging.Target(topic="topic_test") self.assertRaises(NotImplementedError, self.driver.listen, target, |