diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 2b1008b..246cab8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division import collections import copy import errno +import io import logging from random import shuffle, uniform @@ -27,7 +28,7 @@ from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.types import Int32 +from kafka.protocol.types import Int32, Int8 from kafka.version import __version__ @@ -39,6 +40,10 @@ log = logging.getLogger(__name__) DEFAULT_KAFKA_PORT = 9092 +SASL_QOP_AUTH = 1 +SASL_QOP_AUTH_INT = 2 +SASL_QOP_AUTH_CONF = 4 + try: import ssl ssl_available = True @@ -517,43 +522,59 @@ class BrokerConnection(object): return future.success(True) def _try_authenticate_gssapi(self, future): + auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname gssapi_name = gssapi.Name( - self.config['sasl_kerberos_service_name'] + '@' + self.hostname, + auth_id, name_type=gssapi.NameType.hostbased_service ).canonicalize(gssapi.MechType.kerberos) log.debug('%s: GSSAPI name: %s', self, gssapi_name) - # Exchange tokens until authentication either succeeds or fails - client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') - received_token = None + # Establish security context and negotiate protection level + # For reference RFC 2222, section 7.2.1 try: + # Exchange tokens until authentication either succeeds or fails + client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') + received_token = None while not client_ctx.complete: # calculate an output token from kafka token (or None if first iteration) output_token = client_ctx.step(received_token) + # pass output token to kafka, or send empty response if the security + # context is complete (output token is None in that case) if output_token is None: - continue - - # pass output token to kafka - try: + self._send_bytes_blocking(Int32.encode(0)) + else: msg = output_token size = Int32.encode(len(msg)) self._send_bytes_blocking(size + msg) - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - header = self._recv_bytes_blocking(4) - (token_size,) = struct.unpack('>i', header) - received_token = self._recv_bytes_blocking(token_size) - - except ConnectionError as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - self.close(error=error) - return future.failure(error) + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + header = self._recv_bytes_blocking(4) + (token_size,) = struct.unpack('>i', header) + received_token = self._recv_bytes_blocking(token_size) + + # Process the security layer negotiation token, sent by the server + # once the security context is established. + + # unwraps message containing supported protection levels and msg size + msg = client_ctx.unwrap(received_token).message + # Kafka currently doesn't support integrity or confidentiality security layers, so we + # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed + # by the server + msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0]))) + msg[1:] + # add authorization identity to the response, GSS-wrap and send it + msg = client_ctx.wrap(msg + auth_id, False).message + size = Int32.encode(len(msg)) + self._send_bytes_blocking(size + msg) + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) except Exception as e: return future.failure(e) |