summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/impl_kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/impl_kafka.py')
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py42
1 files changed, 27 insertions, 15 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 6729f87..09abfc5 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -265,18 +265,17 @@ class ProducerConnection(Connection):
self.producer = None
self.producer_lock = threading.Lock()
- def _produce_message(self, topic, message):
- while True:
- try:
- self.producer.produce(topic, message)
- except KafkaException as e:
- LOG.error("Produce message failed: %s" % str(e))
- except BufferError:
- LOG.debug("Produce message queue full, waiting for deliveries")
- self.producer.poll(0.5)
- continue
- break
-
+ def _produce_message(self, topic, message, poll):
+ if poll:
+ self.producer.poll(poll)
+ try:
+ self.producer.produce(topic, message)
+ except KafkaException as e:
+ self.producer.poll(0)
+ raise e
+ except BufferError as e:
+ # We'll have to poll next time
+ raise e
self.producer.poll(0)
def notify_send(self, topic, ctxt, msg, retry):
@@ -293,9 +292,22 @@ class ProducerConnection(Connection):
try:
self._ensure_producer()
- if eventletutils.is_monkey_patched('thread'):
- return tpool.execute(self._produce_message, topic, message)
- return self._produce_message(topic, message)
+ poll = 0
+ while True:
+ try:
+ if eventletutils.is_monkey_patched('thread'):
+ return tpool.execute(self._produce_message, topic,
+ message, poll)
+ return self._produce_message(topic, message, poll)
+ except KafkaException as e:
+ LOG.error("Produce message failed: %s" % str(e))
+ break
+ except BufferError:
+ LOG.debug("Produce message queue full, "
+ "waiting for deliveries")
+ # We'll retry with .5s polling
+ poll = 0.5
+
except Exception:
# NOTE(sileht): if something goes wrong close the producer
# connection