summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-10 15:07:52 -0700
committerGitHub <noreply@github.com>2017-10-10 15:07:52 -0700
commit1df58bf87da1a2c8a2f9e659dfabaed1cff7c0c2 (patch)
tree4af4ed853c03a6ddf95225c378ee2bf3a2a87c1c
parent5c17cf035019dca4b451b0db8f5e65c8e489a0f4 (diff)
downloadkafka-python-1df58bf87da1a2c8a2f9e659dfabaed1cff7c0c2.tar.gz
Check for disconnects during ssl handshake and sasl authentication (#1249)
-rw-r--r--kafka/conn.py73
1 files changed, 42 insertions, 31 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 467519e..e10d4f1 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -299,12 +299,15 @@ class BrokerConnection(object):
self._sock.setsockopt(*option)
self._sock.setblocking(False)
+ self.last_attempt = time.time()
+ self.state = ConnectionStates.CONNECTING
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
- log.info('%s: connecting to %s:%d', self, self.host, self.port)
- self.state = ConnectionStates.CONNECTING
- self.last_attempt = time.time()
- self.config['state_change_callback'](self)
+ # _wrap_ssl can alter the connection state -- disconnects on failure
+ # so we need to double check that we are still connecting before
+ if self.connecting():
+ self.config['state_change_callback'](self)
+ log.info('%s: connecting to %s:%d', self, self.host, self.port)
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -367,10 +370,12 @@ class BrokerConnection(object):
if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self._try_authenticate():
- log.debug('%s: Connection complete.', self)
- self.state = ConnectionStates.CONNECTED
- self._reset_reconnect_backoff()
- self.config['state_change_callback'](self)
+ # _try_authenticate has side-effects: possibly disconnected on socket errors
+ if self.state is ConnectionStates.AUTHENTICATING:
+ log.debug('%s: Connection complete.', self)
+ self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
+ self.config['state_change_callback'](self)
return self.state
@@ -397,10 +402,7 @@ class BrokerConnection(object):
password=self.config['ssl_password'])
if self.config['ssl_crlfile']:
if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
- error = 'No CRL support with this version of Python.'
- log.error('%s: %s Disconnecting.', self, error)
- self.close(Errors.ConnectionError(error))
- return
+ raise RuntimeError('This version of Python does not support ssl_crlfile!')
log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile'])
self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
# pylint: disable=no-member
@@ -443,7 +445,9 @@ class BrokerConnection(object):
self._sasl_auth_future = future
self._recv()
if self._sasl_auth_future.failed():
- raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
+ ex = self._sasl_auth_future.exception
+ if not isinstance(ex, Errors.ConnectionError):
+ raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()
def _handle_sasl_handshake_response(self, future, response):
@@ -463,6 +467,19 @@ class BrokerConnection(object):
'kafka-python does not support SASL mechanism %s' %
self.config['sasl_mechanism']))
+ def _recv_bytes_blocking(self, n):
+ self._sock.setblocking(True)
+ try:
+ data = b''
+ while len(data) < n:
+ fragment = self._sock.recv(n - len(data))
+ if not fragment:
+ raise ConnectionError('Connection reset during recv')
+ data += fragment
+ return data
+ finally:
+ self._sock.setblocking(False)
+
def _try_authenticate_plain(self, future):
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.warning('%s: Sending username and password in the clear', self)
@@ -476,30 +493,23 @@ class BrokerConnection(object):
self.config['sasl_plain_password']]).encode('utf-8'))
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
+ self._sock.setblocking(False)
# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
- while len(data) < 4:
- fragment = self._sock.recv(4 - len(data))
- if not fragment:
- log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
- error = Errors.AuthenticationFailedError(
- 'Authentication failed for user {0}'.format(
- self.config['sasl_plain_username']))
- future.failure(error)
- raise error
- data += fragment
- self._sock.setblocking(False)
- except (AssertionError, ConnectionError) as e:
+ self._recv_bytes_blocking(4)
+
+ except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
- future.failure(error)
self.close(error=error)
+ return future.failure(error)
if data != b'\x00\x00\x00\x00':
- return future.failure(Errors.AuthenticationFailedError())
+ error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
+ return future.failure(error)
- log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
+ log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)
def _try_authenticate_gssapi(self, future):
@@ -524,14 +534,15 @@ class BrokerConnection(object):
msg = output_token
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
+ self._sock.setblocking(False)
+
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
- header = self._sock.recv(4)
+ header = self._recv_bytes_blocking(4)
token_size = struct.unpack('>i', header)
- received_token = self._sock.recv(token_size)
- self._sock.setblocking(False)
+ received_token = self._recv_bytes_blocking(token_size)
except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)