summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-10 11:01:37 -0700
committerGitHub <noreply@github.com>2017-10-10 11:01:37 -0700
commit5c17cf035019dca4b451b0db8f5e65c8e489a0f4 (patch)
tree2e8795b94bd6b51645942f71b79288f3188e9553
parent0d4e28f05efd2a1e39558ab2516e054b97297900 (diff)
downloadkafka-python-5c17cf035019dca4b451b0db8f5e65c8e489a0f4.tar.gz
Always wait for completion during SASL/GSSAPI authentication (#1248)
-rw-r--r--kafka/conn.py41
1 files changed, 15 insertions, 26 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 0181cef..467519e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -6,8 +6,9 @@ import errno
import logging
from random import shuffle, uniform
import socket
-import time
+import struct
import sys
+import time
from kafka.vendor import six
@@ -508,52 +509,40 @@ class BrokerConnection(object):
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
- # Exchange tokens until authentication either succeeds or fails:
+ log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name)
+
+ # Exchange tokens until authentication either succeeds or fails
received_token = None
try:
while not ctx_Context.complete:
- # calculate the output token
- try:
- output_token = ctx_Context.step(received_token)
- except GSSError as e:
- log.exception("%s: Error invalid token received from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ # calculate an output token from kafka token (or None if first iteration)
+ output_token = ctx_Context.step(received_token)
- if not output_token:
- if ctx_Context.complete:
- log.debug("%s: Security Context complete ", self)
- log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name)
- break
+ # pass output token to kafka
try:
self._sock.setblocking(True)
- # Send output token
msg = output_token
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
-
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
- response = self._sock.recv(2000)
+ header = self._sock.recv(4)
+ token_size = struct.unpack('>i', header)
+ received_token = self._sock.recv(token_size)
self._sock.setblocking(False)
- except (AssertionError, ConnectionError) as e:
+ except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
- future.failure(error)
self.close(error=error)
-
- # pass the received token back to gssapi, strip the first 4 bytes
- received_token = response[4:]
+ return future.failure(error)
except Exception as e:
- log.exception("%s: GSSAPI handshake error", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
- future.failure(error)
- self.close(error=error)
+ return future.failure(e)
- log.info('%s: Authenticated as %s', self, gssname)
+ log.info('%s: Authenticated as %s via GSSAPI', self, gssname)
return future.success(True)
def blacked_out(self):