From 2a4f646d67b256f41391ad4f974a37020e4cbdf1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 28 Mar 2019 22:41:53 -0700 Subject: Use _need_wakeup flag to avoid multiple writes to wake socket --- kafka/client_async.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'kafka') diff --git a/kafka/client_async.py b/kafka/client_async.py index dbe23f5..15f75d6 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -211,6 +211,7 @@ class KafkaClient(object): self._bootstrap_fails = 0 self._wake_r = None self._wake_w = None + self._need_wakeup = False self._wake_lock = threading.Lock() self._init_wakeup_socketpair() @@ -891,7 +892,9 @@ class KafkaClient(object): def wakeup(self): with self._wake_lock: try: - self._wake_w.send(b'x') + if not self._need_wakeup: + self._wake_w.send(b'x') + self._need_wakeup = True except socket.error as e: log.warning('Unable to send to wakeup socket! (%s)', e) @@ -902,6 +905,7 @@ class KafkaClient(object): self._wake_r.recv(4096) except socket.error: break + self._need_wakeup = False def _maybe_close_oldest_connection(self): expired_connection = self._idle_expiry_manager.poll_expired_connection() -- cgit v1.2.1