summaryrefslogtreecommitdiff
path: root/oslo_messaging/tests/drivers/test_impl_kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/tests/drivers/test_impl_kafka.py')
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py32
1 files changed, 32 insertions, 0 deletions
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index 77b2ed6..5e78369 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -15,6 +15,8 @@
import testscenarios
from unittest import mock
+from confluent_kafka import KafkaException
+
import oslo_messaging
from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_messaging.tests import utils as test_utils
@@ -120,6 +122,36 @@ class TestKafkaDriver(test_utils.BaseTestCase):
'ssl.key.password': '',
})
+ def test_send_notification_retries_on_buffer_error(self):
+ target = oslo_messaging.Target(topic="topic_test")
+
+ with mock.patch("confluent_kafka.Producer") as producer:
+ fake_producer = mock.MagicMock()
+ fake_producer.produce = mock.Mock(
+ side_effect=[BufferError, BufferError, None])
+ producer.return_value = fake_producer
+
+ self.driver.send_notification(
+ target, {}, {"payload": ["test_1"]},
+ None, retry=3)
+
+ assert fake_producer.produce.call_count == 3
+
+ def test_send_notification_stops_on_kafka_error(self):
+ target = oslo_messaging.Target(topic="topic_test")
+
+ with mock.patch("confluent_kafka.Producer") as producer:
+ fake_producer = mock.MagicMock()
+ fake_producer.produce = mock.Mock(
+ side_effect=[KafkaException, None])
+ producer.return_value = fake_producer
+
+ self.driver.send_notification(
+ target, {}, {"payload": ["test_1"]},
+ None, retry=3)
+
+ assert fake_producer.produce.call_count == 1
+
def test_listen(self):
target = oslo_messaging.Target(topic="topic_test")
self.assertRaises(NotImplementedError, self.driver.listen, target,