diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-10 11:01:37 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-10 11:01:37 -0700 |
commit | 5c17cf035019dca4b451b0db8f5e65c8e489a0f4 (patch) | |
tree | 2e8795b94bd6b51645942f71b79288f3188e9553 | |
parent | 0d4e28f05efd2a1e39558ab2516e054b97297900 (diff) | |
download | kafka-python-5c17cf035019dca4b451b0db8f5e65c8e489a0f4.tar.gz |
Always wait for completion during SASL/GSSAPI authentication (#1248)
-rw-r--r-- | kafka/conn.py | 41 |
1 files changed, 15 insertions, 26 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 0181cef..467519e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -6,8 +6,9 @@ import errno import logging from random import shuffle, uniform import socket -import time +import struct import sys +import time from kafka.vendor import six @@ -508,52 +509,40 @@ class BrokerConnection(object): ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') - # Exchange tokens until authentication either succeeds or fails: + log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name) + + # Exchange tokens until authentication either succeeds or fails received_token = None try: while not ctx_Context.complete: - # calculate the output token - try: - output_token = ctx_Context.step(received_token) - except GSSError as e: - log.exception("%s: Error invalid token received from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + # calculate an output token from kafka token (or None if first iteration) + output_token = ctx_Context.step(received_token) - if not output_token: - if ctx_Context.complete: - log.debug("%s: Security Context complete ", self) - log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name) - break + # pass output token to kafka try: self._sock.setblocking(True) - # Send output token msg = output_token size = Int32.encode(len(msg)) self._sock.sendall(size + msg) - # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. # The gssapi will be able to identify the needed next step. # The connection is closed on failure. - response = self._sock.recv(2000) + header = self._sock.recv(4) + token_size = struct.unpack('>i', header) + received_token = self._sock.recv(token_size) self._sock.setblocking(False) - except (AssertionError, ConnectionError) as e: + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) self.close(error=error) - - # pass the received token back to gssapi, strip the first 4 bytes - received_token = response[4:] + return future.failure(error) except Exception as e: - log.exception("%s: GSSAPI handshake error", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) - self.close(error=error) + return future.failure(e) - log.info('%s: Authenticated as %s', self, gssname) + log.info('%s: Authenticated as %s via GSSAPI', self, gssname) return future.success(True) def blacked_out(self): |