diff options
Diffstat (limited to 'oslo_messaging/_drivers/impl_kafka.py')
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 42 |
1 files changed, 27 insertions, 15 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 6729f87..09abfc5 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -265,18 +265,17 @@ class ProducerConnection(Connection): self.producer = None self.producer_lock = threading.Lock() - def _produce_message(self, topic, message): - while True: - try: - self.producer.produce(topic, message) - except KafkaException as e: - LOG.error("Produce message failed: %s" % str(e)) - except BufferError: - LOG.debug("Produce message queue full, waiting for deliveries") - self.producer.poll(0.5) - continue - break - + def _produce_message(self, topic, message, poll): + if poll: + self.producer.poll(poll) + try: + self.producer.produce(topic, message) + except KafkaException as e: + self.producer.poll(0) + raise e + except BufferError as e: + # We'll have to poll next time + raise e self.producer.poll(0) def notify_send(self, topic, ctxt, msg, retry): @@ -293,9 +292,22 @@ class ProducerConnection(Connection): try: self._ensure_producer() - if eventletutils.is_monkey_patched('thread'): - return tpool.execute(self._produce_message, topic, message) - return self._produce_message(topic, message) + poll = 0 + while True: + try: + if eventletutils.is_monkey_patched('thread'): + return tpool.execute(self._produce_message, topic, + message, poll) + return self._produce_message(topic, message, poll) + except KafkaException as e: + LOG.error("Produce message failed: %s" % str(e)) + break + except BufferError: + LOG.debug("Produce message queue full, " + "waiting for deliveries") + # We'll retry with .5s polling + poll = 0.5 + except Exception: # NOTE(sileht): if something goes wrong close the producer # connection |