diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-15 20:14:08 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-15 20:14:08 -0700 |
commit | 0d2164431f8245359c417473fd84e7af034f1306 (patch) | |
tree | 34ba7ce633a6df9fe5163037f3ca1dba342f9f46 | |
parent | fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (diff) | |
download | kafka-python-0d2164431f8245359c417473fd84e7af034f1306.tar.gz |
Fix SASL authentication bugs (#1257)
* Use _send_bytes_blocking in BrokerConnection
* _try_authenticate should call recv() so that futures are resolved
* _sasl_auth_future can be reset if recv() causes disconnect
* validate sasl_mechanism against SaslHandShakeResponse enabled_mechanisms
-rw-r--r-- | kafka/conn.py | 57 |
1 files changed, 34 insertions, 23 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index e10d4f1..7ca2652 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -443,8 +443,11 @@ class BrokerConnection(object): sasl_response.add_callback(self._handle_sasl_handshake_response, future) sasl_response.add_errback(lambda f, e: f.failure(e), future) self._sasl_auth_future = future - self._recv() - if self._sasl_auth_future.failed(): + self.recv() + # A connection error could trigger close() which will reset the future + if self._sasl_auth_future is None: + return False + elif self._sasl_auth_future.failed(): ex = self._sasl_auth_future.exception if not isinstance(ex, Errors.ConnectionError): raise ex # pylint: disable-msg=raising-bad-type @@ -457,7 +460,12 @@ class BrokerConnection(object): self.close(error=error) return future.failure(error_type(self)) - if self.config['sasl_mechanism'] == 'PLAIN': + if self.config['sasl_mechanism'] not in response.enabled_mechanisms: + return future.failure( + Errors.UnsupportedSaslMechanismError( + 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' + % (self.config['sasl_mechanism'], response.enabled_mechanisms))) + elif self.config['sasl_mechanism'] == 'PLAIN': return self._try_authenticate_plain(future) elif self.config['sasl_mechanism'] == 'GSSAPI': return self._try_authenticate_gssapi(future) @@ -467,6 +475,19 @@ class BrokerConnection(object): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _send_bytes_blocking(self, data): + self._sock.setblocking(True) + total_sent = 0 + try: + while total_sent < len(data): + sent_bytes = self._sock.send(data[total_sent:]) + total_sent += sent_bytes + if total_sent != len(data): + raise ConnectionError('Buffer overrun during socket send') + return total_sent + finally: + self._sock.setblocking(False) + def _recv_bytes_blocking(self, n): self._sock.setblocking(True) try: @@ -485,15 +506,13 @@ class BrokerConnection(object): log.warning('%s: Sending username and password in the clear', self) data = b'' + # Send PLAIN credentials per RFC-4616 + msg = bytes('\0'.join([self.config['sasl_plain_username'], + self.config['sasl_plain_username'], + self.config['sasl_plain_password']]).encode('utf-8')) + size = Int32.encode(len(msg)) try: - self._sock.setblocking(True) - # Send PLAIN credentials per RFC-4616 - msg = bytes('\0'.join([self.config['sasl_plain_username'], - self.config['sasl_plain_username'], - self.config['sasl_plain_password']]).encode('utf-8')) - size = Int32.encode(len(msg)) - self._sock.sendall(size + msg) - self._sock.setblocking(False) + self._send_bytes_blocking(size + msg) # The server will send a zero sized message (that is Int32(0)) on success. # The connection is closed on failure @@ -530,11 +549,9 @@ class BrokerConnection(object): # pass output token to kafka try: - self._sock.setblocking(True) msg = output_token size = Int32.encode(len(msg)) - self._sock.sendall(size + msg) - self._sock.setblocking(False) + self._send_bytes_blocking(size + msg) # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. @@ -662,16 +679,10 @@ class BrokerConnection(object): # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block # sending each request payload - self._sock.setblocking(True) - total_sent = 0 - while total_sent < len(data): - sent_bytes = self._sock.send(data[total_sent:]) - total_sent += sent_bytes - assert total_sent == len(data) + total_bytes = self._send_bytes_blocking(data) if self._sensors: - self._sensors.bytes_sent.record(total_sent) - self._sock.setblocking(False) - except (AssertionError, ConnectionError) as e: + self._sensors.bytes_sent.record(total_bytes) + except ConnectionError as e: log.exception("Error sending %s to %s", request, self) error = Errors.ConnectionError("%s: %s" % (self, e)) self.close(error=error) |