From 1c0e8942dc75837a2e43b93e0ed6700fb7752a03 Mon Sep 17 00:00:00 2001 From: flaneur Date: Sun, 11 Nov 2018 04:45:51 +0800 Subject: set socket timeout for the wake_w (#1577) --- kafka/client_async.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 0cb575c..c3fcc79 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,7 @@ class KafkaClient(object): 'bootstrap_topics_filter': set(), 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, + 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -203,6 +204,7 @@ class KafkaClient(object): self._bootstrap_fails = 0 self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) + self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) self._wake_lock = threading.Lock() self._lock = threading.RLock() @@ -871,6 +873,9 @@ class KafkaClient(object): with self._wake_lock: try: self._wake_w.sendall(b'x') + except socket.timeout: + log.warning('Timeout to send to wakeup socket!') + raise Errors.KafkaTimeoutError() except socket.error: log.warning('Unable to send to wakeup socket!') -- cgit v1.2.1