summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-15 20:14:08 -0700
committerGitHub <noreply@github.com>2017-10-15 20:14:08 -0700
commit0d2164431f8245359c417473fd84e7af034f1306 (patch)
tree34ba7ce633a6df9fe5163037f3ca1dba342f9f46
parentfbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (diff)
downloadkafka-python-0d2164431f8245359c417473fd84e7af034f1306.tar.gz
Fix SASL authentication bugs (#1257)
* Use _send_bytes_blocking in BrokerConnection * _try_authenticate should call recv() so that futures are resolved * _sasl_auth_future can be reset if recv() causes disconnect * validate sasl_mechanism against SaslHandShakeResponse enabled_mechanisms
-rw-r--r--kafka/conn.py57
1 files changed, 34 insertions, 23 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index e10d4f1..7ca2652 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -443,8 +443,11 @@ class BrokerConnection(object):
sasl_response.add_callback(self._handle_sasl_handshake_response, future)
sasl_response.add_errback(lambda f, e: f.failure(e), future)
self._sasl_auth_future = future
- self._recv()
- if self._sasl_auth_future.failed():
+ self.recv()
+ # A connection error could trigger close() which will reset the future
+ if self._sasl_auth_future is None:
+ return False
+ elif self._sasl_auth_future.failed():
ex = self._sasl_auth_future.exception
if not isinstance(ex, Errors.ConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
@@ -457,7 +460,12 @@ class BrokerConnection(object):
self.close(error=error)
return future.failure(error_type(self))
- if self.config['sasl_mechanism'] == 'PLAIN':
+ if self.config['sasl_mechanism'] not in response.enabled_mechanisms:
+ return future.failure(
+ Errors.UnsupportedSaslMechanismError(
+ 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
+ % (self.config['sasl_mechanism'], response.enabled_mechanisms)))
+ elif self.config['sasl_mechanism'] == 'PLAIN':
return self._try_authenticate_plain(future)
elif self.config['sasl_mechanism'] == 'GSSAPI':
return self._try_authenticate_gssapi(future)
@@ -467,6 +475,19 @@ class BrokerConnection(object):
'kafka-python does not support SASL mechanism %s' %
self.config['sasl_mechanism']))
+ def _send_bytes_blocking(self, data):
+ self._sock.setblocking(True)
+ total_sent = 0
+ try:
+ while total_sent < len(data):
+ sent_bytes = self._sock.send(data[total_sent:])
+ total_sent += sent_bytes
+ if total_sent != len(data):
+ raise ConnectionError('Buffer overrun during socket send')
+ return total_sent
+ finally:
+ self._sock.setblocking(False)
+
def _recv_bytes_blocking(self, n):
self._sock.setblocking(True)
try:
@@ -485,15 +506,13 @@ class BrokerConnection(object):
log.warning('%s: Sending username and password in the clear', self)
data = b''
+ # Send PLAIN credentials per RFC-4616
+ msg = bytes('\0'.join([self.config['sasl_plain_username'],
+ self.config['sasl_plain_username'],
+ self.config['sasl_plain_password']]).encode('utf-8'))
+ size = Int32.encode(len(msg))
try:
- self._sock.setblocking(True)
- # Send PLAIN credentials per RFC-4616
- msg = bytes('\0'.join([self.config['sasl_plain_username'],
- self.config['sasl_plain_username'],
- self.config['sasl_plain_password']]).encode('utf-8'))
- size = Int32.encode(len(msg))
- self._sock.sendall(size + msg)
- self._sock.setblocking(False)
+ self._send_bytes_blocking(size + msg)
# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
@@ -530,11 +549,9 @@ class BrokerConnection(object):
# pass output token to kafka
try:
- self._sock.setblocking(True)
msg = output_token
size = Int32.encode(len(msg))
- self._sock.sendall(size + msg)
- self._sock.setblocking(False)
+ self._send_bytes_blocking(size + msg)
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
@@ -662,16 +679,10 @@ class BrokerConnection(object):
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
# sending each request payload
- self._sock.setblocking(True)
- total_sent = 0
- while total_sent < len(data):
- sent_bytes = self._sock.send(data[total_sent:])
- total_sent += sent_bytes
- assert total_sent == len(data)
+ total_bytes = self._send_bytes_blocking(data)
if self._sensors:
- self._sensors.bytes_sent.record(total_sent)
- self._sock.setblocking(False)
- except (AssertionError, ConnectionError) as e:
+ self._sensors.bytes_sent.record(total_bytes)
+ except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError("%s: %s" % (self, e))
self.close(error=error)