diff options
author | flaneur <me.ssword@gmail.com> | 2018-11-11 04:45:51 +0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-11-10 12:45:51 -0800 |
commit | 1c0e8942dc75837a2e43b93e0ed6700fb7752a03 (patch) | |
tree | c526bfc738c0cd93709fb849a8d2346888bbe0e5 | |
parent | 0a2ccba3cb1b8636f615a30821123720773a8dfa (diff) | |
download | kafka-python-1c0e8942dc75837a2e43b93e0ed6700fb7752a03.tar.gz |
set socket timeout for the wake_w (#1577)
-rw-r--r-- | kafka/client_async.py | 5 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 1 |
2 files changed, 6 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 0cb575c..c3fcc79 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,7 @@ class KafkaClient(object): 'bootstrap_topics_filter': set(), 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, + 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -203,6 +204,7 @@ class KafkaClient(object): self._bootstrap_fails = 0 self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) + self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) self._wake_lock = threading.Lock() self._lock = threading.RLock() @@ -871,6 +873,9 @@ class KafkaClient(object): with self._wake_lock: try: self._wake_w.sendall(b'x') + except socket.timeout: + log.warning('Timeout to send to wakeup socket!') + raise Errors.KafkaTimeoutError() except socket.error: log.warning('Unable to send to wakeup socket!') diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 7878c0a..45bb058 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -368,6 +368,7 @@ class KafkaProducer(object): self._metrics = Metrics(metric_config, reporters) client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], **self.config) # Get auto-discovered version from client if necessary |