summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-12-21 07:46:22 +0000
committerGerrit Code Review <review@openstack.org>2022-12-21 07:46:22 +0000
commit9f710ce6cd955eaf6d7f9b474efa21df3d531be0 (patch)
tree60b6996b831030f15b2c736a7bd2c1702d7eed58
parentbd73f14fd2b8fb7e5587888af126fc59867e4a36 (diff)
parent43f2224aacb668aa51de3d1274ff8939d8aa73ae (diff)
downloadoslo-messaging-9f710ce6cd955eaf6d7f9b474efa21df3d531be0.tar.gz
Merge "Remove logging from ProducerConnection._produce_message"14.1.0
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py42
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py32
-rw-r--r--releasenotes/notes/bug-1981093-kafka-dont-log-in-tpool-execute-fa50ceee2d55ebae.yaml8
3 files changed, 67 insertions, 15 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 6729f87..09abfc5 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -265,18 +265,17 @@ class ProducerConnection(Connection):
self.producer = None
self.producer_lock = threading.Lock()
- def _produce_message(self, topic, message):
- while True:
- try:
- self.producer.produce(topic, message)
- except KafkaException as e:
- LOG.error("Produce message failed: %s" % str(e))
- except BufferError:
- LOG.debug("Produce message queue full, waiting for deliveries")
- self.producer.poll(0.5)
- continue
- break
-
+ def _produce_message(self, topic, message, poll):
+ if poll:
+ self.producer.poll(poll)
+ try:
+ self.producer.produce(topic, message)
+ except KafkaException as e:
+ self.producer.poll(0)
+ raise e
+ except BufferError as e:
+ # We'll have to poll next time
+ raise e
self.producer.poll(0)
def notify_send(self, topic, ctxt, msg, retry):
@@ -293,9 +292,22 @@ class ProducerConnection(Connection):
try:
self._ensure_producer()
- if eventletutils.is_monkey_patched('thread'):
- return tpool.execute(self._produce_message, topic, message)
- return self._produce_message(topic, message)
+ poll = 0
+ while True:
+ try:
+ if eventletutils.is_monkey_patched('thread'):
+ return tpool.execute(self._produce_message, topic,
+ message, poll)
+ return self._produce_message(topic, message, poll)
+ except KafkaException as e:
+ LOG.error("Produce message failed: %s" % str(e))
+ break
+ except BufferError:
+ LOG.debug("Produce message queue full, "
+ "waiting for deliveries")
+ # We'll retry with .5s polling
+ poll = 0.5
+
except Exception:
# NOTE(sileht): if something goes wrong close the producer
# connection
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index 77b2ed6..5e78369 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -15,6 +15,8 @@
import testscenarios
from unittest import mock
+from confluent_kafka import KafkaException
+
import oslo_messaging
from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_messaging.tests import utils as test_utils
@@ -120,6 +122,36 @@ class TestKafkaDriver(test_utils.BaseTestCase):
'ssl.key.password': '',
})
+ def test_send_notification_retries_on_buffer_error(self):
+ target = oslo_messaging.Target(topic="topic_test")
+
+ with mock.patch("confluent_kafka.Producer") as producer:
+ fake_producer = mock.MagicMock()
+ fake_producer.produce = mock.Mock(
+ side_effect=[BufferError, BufferError, None])
+ producer.return_value = fake_producer
+
+ self.driver.send_notification(
+ target, {}, {"payload": ["test_1"]},
+ None, retry=3)
+
+ assert fake_producer.produce.call_count == 3
+
+ def test_send_notification_stops_on_kafka_error(self):
+ target = oslo_messaging.Target(topic="topic_test")
+
+ with mock.patch("confluent_kafka.Producer") as producer:
+ fake_producer = mock.MagicMock()
+ fake_producer.produce = mock.Mock(
+ side_effect=[KafkaException, None])
+ producer.return_value = fake_producer
+
+ self.driver.send_notification(
+ target, {}, {"payload": ["test_1"]},
+ None, retry=3)
+
+ assert fake_producer.produce.call_count == 1
+
def test_listen(self):
target = oslo_messaging.Target(topic="topic_test")
self.assertRaises(NotImplementedError, self.driver.listen, target,
diff --git a/releasenotes/notes/bug-1981093-kafka-dont-log-in-tpool-execute-fa50ceee2d55ebae.yaml b/releasenotes/notes/bug-1981093-kafka-dont-log-in-tpool-execute-fa50ceee2d55ebae.yaml
new file mode 100644
index 0000000..1103247
--- /dev/null
+++ b/releasenotes/notes/bug-1981093-kafka-dont-log-in-tpool-execute-fa50ceee2d55ebae.yaml
@@ -0,0 +1,8 @@
+---
+fixes:
+ - |
+ [`bug 1981093 <https://bugs.launchpad.net/oslo.messaging/+bug/1981093>`_]
+ Pulls calls to logging functions out of ``impl_kafka._produce_message``.
+ Since ``_produce_message`` is called through tpool.execute, calling logging
+ functions inside ``_produce_message`` could cause subsequent calls to
+ logging functions to deadlock.