From cbcbfe48be6f9f95981cfeb50dbac6dd4e2ef8d7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 12 Jan 2019 22:00:38 -0800 Subject: Timeout all unconnected conns (incl SSL) after request_timeout_ms --- kafka/conn.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'kafka/conn.py') diff --git a/kafka/conn.py b/kafka/conn.py index 4d56964..7dfc8bd 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -351,7 +351,6 @@ class BrokerConnection(object): if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex # to check connection status - request_timeout = self.config['request_timeout_ms'] / 1000.0 ret = None try: ret = self._sock.connect_ex(self._sock_addr) @@ -389,11 +388,6 @@ class BrokerConnection(object): errstr = errno.errorcode.get(ret, 'UNKNOWN') self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) - # Connection timed out - elif time.time() > request_timeout + self.last_attempt: - log.error('Connection attempt to %s timed out', self) - self.close(Errors.KafkaConnectionError('timeout')) - # Needs retry else: pass @@ -419,6 +413,14 @@ class BrokerConnection(object): self._reset_reconnect_backoff() self.config['state_change_callback'](self) + if self.state not in (ConnectionStates.CONNECTED, + ConnectionStates.DISCONNECTED): + # Connection timed out + request_timeout = self.config['request_timeout_ms'] / 1000.0 + if time.time() > request_timeout + self.last_attempt: + log.error('Connection attempt to %s timed out', self) + self.close(Errors.KafkaConnectionError('timeout')) + return self.state def _wrap_ssl(self): -- cgit v1.2.1