summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-01-12 22:00:38 -0800
committerJeff Widman <jeff@jeffwidman.com>2019-01-13 08:52:32 -0800
commitac5a935d0c8295fd66d7d3b86e266f05b09b4091 (patch)
treed6c25d15b7caef2a3489600c4d1b405d93959d19
parent1a31be52ec012dfa0ef5079ff9982e01408a8fe1 (diff)
downloadkafka-python-ac5a935d0c8295fd66d7d3b86e266f05b09b4091.tar.gz
Timeout all unconnected conns (incl SSL) after request_timeout_ms
-rw-r--r--kafka/conn.py14
1 files changed, 8 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 4d56964..7dfc8bd 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -351,7 +351,6 @@ class BrokerConnection(object):
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
- request_timeout = self.config['request_timeout_ms'] / 1000.0
ret = None
try:
ret = self._sock.connect_ex(self._sock_addr)
@@ -389,11 +388,6 @@ class BrokerConnection(object):
errstr = errno.errorcode.get(ret, 'UNKNOWN')
self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
- # Connection timed out
- elif time.time() > request_timeout + self.last_attempt:
- log.error('Connection attempt to %s timed out', self)
- self.close(Errors.KafkaConnectionError('timeout'))
-
# Needs retry
else:
pass
@@ -419,6 +413,14 @@ class BrokerConnection(object):
self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
+ if self.state not in (ConnectionStates.CONNECTED,
+ ConnectionStates.DISCONNECTED):
+ # Connection timed out
+ request_timeout = self.config['request_timeout_ms'] / 1000.0
+ if time.time() > request_timeout + self.last_attempt:
+ log.error('Connection attempt to %s timed out', self)
+ self.close(Errors.KafkaConnectionError('timeout'))
+
return self.state
def _wrap_ssl(self):