summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Jørgen Solberg <larsjs@met.no>2016-08-03 11:42:00 +0000
committerDana Powers <dana.powers@gmail.com>2016-08-03 11:46:12 -0700
commit787e8b2ba033cf3d961ca1f5ee345c279222ca8b (patch)
treeca5a3e79e002bdb956e4faf73aeec6874ee8d9de
parent2b2c72feac9d88092a8e5148f951eb956b6396a6 (diff)
downloadkafka-python-787e8b2ba033cf3d961ca1f5ee345c279222ca8b.tar.gz
minor tweaks to get authentication working
-rw-r--r--kafka/conn.py15
1 files changed, 7 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 852c59d..05b0acb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -299,7 +299,7 @@ class BrokerConnection(object):
return False
def _try_authenticate(self):
- assert self.config['api_version'] >= (0, 10) or self.config['api_version'] is None
+ assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
if self._sasl_auth_future is None:
# Build a SaslHandShakeRequest message
@@ -311,7 +311,7 @@ class BrokerConnection(object):
self._sasl_auth_future = future
self._recv()
if self._sasl_auth_future.failed():
- raise self._sasl_auth_future.exception
+ raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()
def _handle_sasl_handshake_response(self, future, response):
@@ -345,17 +345,16 @@ class BrokerConnection(object):
# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
- received_bytes = 0
- while received_bytes < 4:
- data += self._sock.recv(4 - received_bytes)
- received_bytes += len(data)
- if not data:
+ while len(data) < 4:
+ fragment = self._sock.recv(4 - len(data))
+ if not fragment:
log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
error = Errors.AuthenticationFailedError(
'Authentication failed for user {0}'.format(
self.config['sasl_plain_username']))
future.failure(error)
raise error
+ data += fragment
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("%s: Error receiving reply from server", self)
@@ -363,7 +362,7 @@ class BrokerConnection(object):
future.failure(error)
self.close(error=error)
- if data != '\x00\x00\x00\x00':
+ if data != b'\x00\x00\x00\x00':
return future.failure(Errors.AuthenticationFailedError())
return future.success(True)