summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorflaneur <me.ssword@gmail.com>2018-11-11 04:45:51 +0800
committerDana Powers <dana.powers@gmail.com>2018-11-10 12:45:51 -0800
commit1c0e8942dc75837a2e43b93e0ed6700fb7752a03 (patch)
treec526bfc738c0cd93709fb849a8d2346888bbe0e5
parent0a2ccba3cb1b8636f615a30821123720773a8dfa (diff)
downloadkafka-python-1c0e8942dc75837a2e43b93e0ed6700fb7752a03.tar.gz
set socket timeout for the wake_w (#1577)
-rw-r--r--kafka/client_async.py5
-rw-r--r--kafka/producer/kafka.py1
2 files changed, 6 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0cb575c..c3fcc79 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -154,6 +154,7 @@ class KafkaClient(object):
'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 30000,
+ 'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
@@ -203,6 +204,7 @@ class KafkaClient(object):
self._bootstrap_fails = 0
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
+ self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
self._wake_lock = threading.Lock()
self._lock = threading.RLock()
@@ -871,6 +873,9 @@ class KafkaClient(object):
with self._wake_lock:
try:
self._wake_w.sendall(b'x')
+ except socket.timeout:
+ log.warning('Timeout to send to wakeup socket!')
+ raise Errors.KafkaTimeoutError()
except socket.error:
log.warning('Unable to send to wakeup socket!')
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 7878c0a..45bb058 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -368,6 +368,7 @@ class KafkaProducer(object):
self._metrics = Metrics(metric_config, reporters)
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
+ wakeup_timeout_ms=self.config['max_block_ms'],
**self.config)
# Get auto-discovered version from client if necessary