summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-21 20:10:09 -0700
committerGitHub <noreply@github.com>2019-03-21 20:10:09 -0700
commitf2f2bfe44d51b3474f955c16c30ab132f14ba551 (patch)
treea27801dd34e2725afb880e7105dc9d61f09cdcbc
parent64f70b59641fa3c9be1a48ed0a38b64392377600 (diff)
downloadkafka-python-f2f2bfe44d51b3474f955c16c30ab132f14ba551.tar.gz
Wrap SSL sockets after connecting (#1754)
-rw-r--r--kafka/conn.py30
1 files changed, 11 insertions, 19 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 28f9f3c..cdc0a86 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -356,14 +356,9 @@ class BrokerConnection(object):
self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
- if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
- self._wrap_ssl()
- # _wrap_ssl can alter the connection state -- disconnects on failure
- # so we need to double check that we are still connecting before
- if self.connecting():
- self.config['state_change_callback'](self)
- log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
- self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
+ self.config['state_change_callback'](self)
+ log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
+ self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -373,29 +368,29 @@ class BrokerConnection(object):
ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno
- except ValueError as err:
- # Python 3.7 and higher raises ValueError if a socket
- # is already connected
- if sys.version_info >= (3, 7):
- ret = None
- else:
- raise
# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', self)
+
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', self)
self.state = ConnectionStates.HANDSHAKE
+ self.config['state_change_callback'](self)
+ # _wrap_ssl can alter the connection state -- disconnects on failure
+ self._wrap_ssl()
+
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
+ self.config['state_change_callback'](self)
+
else:
# security_protocol PLAINTEXT
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self)
# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -486,9 +481,6 @@ class BrokerConnection(object):
# old ssl in python2.6 will swallow all SSLErrors here...
except (SSLWantReadError, SSLWantWriteError):
pass
- # python 3.7 throws OSError
- except OSError:
- pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))