From 545cdb10cf86ec24ac4f01b46b8ffaf0ab8c016e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 23 Mar 2019 09:00:45 -0700 Subject: Avoid call to wakeup from sender thread for maybe_connect --- kafka/client_async.py | 5 +++-- kafka/producer/sender.py | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index cff172f..ce1c795 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -321,14 +321,15 @@ class KafkaClient(object): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() - def maybe_connect(self, node_id): + def maybe_connect(self, node_id, wakeup=True): """Queues a node for asynchronous connection during the next .poll()""" if self._can_connect(node_id): self._connecting.add(node_id) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding # the client lock in poll(). - self.wakeup() + if wakeup: + self.wakeup() return True return False diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 17d6255..064fee4 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -105,8 +105,9 @@ class Sender(threading.Thread): # remove any nodes we aren't ready to send to not_ready_timeout = float('inf') for node in list(ready_nodes): - if not self._client.ready(node): + if not self._client.is_ready(node): log.debug('Node %s not ready; delaying produce of accumulated batch', node) + self._client.maybe_connect(node, wakeup=False) ready_nodes.remove(node) not_ready_timeout = min(not_ready_timeout, self._client.connection_delay(node)) -- cgit v1.2.1