summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-21 08:26:59 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-21 08:26:59 -0700
commitf1449b81d6644379b1053afc9e03d39101d6ba03 (patch)
tree880840910f45144e073cf7641acd44403b003c26
parentee4a53e9e5ae93231d6f7010f263b30a9924dabb (diff)
downloadkafka-python-f1449b81d6644379b1053afc9e03d39101d6ba03.tar.gz
Wrap SSL sockets after connectingwrap_ssl_after_connect
-rw-r--r--kafka/conn.py30
1 files changed, 11 insertions, 19 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 28f9f3c..cdc0a86 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -356,14 +356,9 @@ class BrokerConnection(object):
self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
- if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
- self._wrap_ssl()
- # _wrap_ssl can alter the connection state -- disconnects on failure
- # so we need to double check that we are still connecting before
- if self.connecting():
- self.config['state_change_callback'](self)
- log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
- self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
+ self.config['state_change_callback'](self)
+ log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
+ self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -373,29 +368,29 @@ class BrokerConnection(object):
ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno
- except ValueError as err:
- # Python 3.7 and higher raises ValueError if a socket
- # is already connected
- if sys.version_info >= (3, 7):
- ret = None
- else:
- raise
# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', self)
+
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', self)
self.state = ConnectionStates.HANDSHAKE
+ self.config['state_change_callback'](self)
+ # _wrap_ssl can alter the connection state -- disconnects on failure
+ self._wrap_ssl()
+
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
+ self.config['state_change_callback'](self)
+
else:
# security_protocol PLAINTEXT
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self)
# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -486,9 +481,6 @@ class BrokerConnection(object):
# old ssl in python2.6 will swallow all SSLErrors here...
except (SSLWantReadError, SSLWantWriteError):
pass
- # python 3.7 throws OSError
- except OSError:
- pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))