diff options
author | Lars Jørgen Solberg <larsjs@met.no> | 2016-08-03 11:42:00 +0000 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-08-03 11:46:12 -0700 |
commit | 787e8b2ba033cf3d961ca1f5ee345c279222ca8b (patch) | |
tree | ca5a3e79e002bdb956e4faf73aeec6874ee8d9de | |
parent | 2b2c72feac9d88092a8e5148f951eb956b6396a6 (diff) | |
download | kafka-python-787e8b2ba033cf3d961ca1f5ee345c279222ca8b.tar.gz |
minor tweaks to get authentication working
-rw-r--r-- | kafka/conn.py | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 852c59d..05b0acb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -299,7 +299,7 @@ class BrokerConnection(object): return False def _try_authenticate(self): - assert self.config['api_version'] >= (0, 10) or self.config['api_version'] is None + assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10) if self._sasl_auth_future is None: # Build a SaslHandShakeRequest message @@ -311,7 +311,7 @@ class BrokerConnection(object): self._sasl_auth_future = future self._recv() if self._sasl_auth_future.failed(): - raise self._sasl_auth_future.exception + raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() def _handle_sasl_handshake_response(self, future, response): @@ -345,17 +345,16 @@ class BrokerConnection(object): # The server will send a zero sized message (that is Int32(0)) on success. # The connection is closed on failure - received_bytes = 0 - while received_bytes < 4: - data += self._sock.recv(4 - received_bytes) - received_bytes += len(data) - if not data: + while len(data) < 4: + fragment = self._sock.recv(4 - len(data)) + if not fragment: log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) error = Errors.AuthenticationFailedError( 'Authentication failed for user {0}'.format( self.config['sasl_plain_username'])) future.failure(error) raise error + data += fragment self._sock.setblocking(False) except (AssertionError, ConnectionError) as e: log.exception("%s: Error receiving reply from server", self) @@ -363,7 +362,7 @@ class BrokerConnection(object): future.failure(error) self.close(error=error) - if data != '\x00\x00\x00\x00': + if data != b'\x00\x00\x00\x00': return future.failure(Errors.AuthenticationFailedError()) return future.success(True) |