diff options
author | Andre Araujo <araujo@cloudera.com> | 2017-10-25 14:04:59 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-12-26 09:49:05 -0800 |
commit | 4cfeaca5c867e15213420caad400f5f1863f64e3 (patch) | |
tree | 5ae006e83afa93046fd72106f072b16f84a337ab | |
parent | c49ae90b105fad958dbc60499aeedd27ff52416c (diff) | |
download | kafka-python-4cfeaca5c867e15213420caad400f5f1863f64e3.tar.gz |
Add security layer negotiation to the GSSAPI authentication. (#1283)
When trying to establish a connection with Kafka using SASL with the
GSSAPI authentication mechanism the connection was hanging an timing out
after 60 secons. On the Kafka broker side I noticed that the
SaslServerAuthenticator was going from the AUTHENTICATE to the FAILED state.
The GSSAPI auth implementation was missing the second handshake defined in
RFC 2222, which happens after the security context is established. This
handshake is used by the client and server to negotiate the security layer (QoP)
to be used for the connection.
Kafka currently only support the "auth" QoP, so the implementation in this commit
doesn't make it configurable, but this can be extended later.
With this change I was able to successfully connect to a Kerberos-enabled Kafka
broker using the SASL_PLAINTEXT protocol and the GSSAPI mechanism.
-rw-r--r-- | kafka/conn.py | 65 |
1 files changed, 43 insertions, 22 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 2b1008b..246cab8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division import collections import copy import errno +import io import logging from random import shuffle, uniform @@ -27,7 +28,7 @@ from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.types import Int32 +from kafka.protocol.types import Int32, Int8 from kafka.version import __version__ @@ -39,6 +40,10 @@ log = logging.getLogger(__name__) DEFAULT_KAFKA_PORT = 9092 +SASL_QOP_AUTH = 1 +SASL_QOP_AUTH_INT = 2 +SASL_QOP_AUTH_CONF = 4 + try: import ssl ssl_available = True @@ -517,43 +522,59 @@ class BrokerConnection(object): return future.success(True) def _try_authenticate_gssapi(self, future): + auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname gssapi_name = gssapi.Name( - self.config['sasl_kerberos_service_name'] + '@' + self.hostname, + auth_id, name_type=gssapi.NameType.hostbased_service ).canonicalize(gssapi.MechType.kerberos) log.debug('%s: GSSAPI name: %s', self, gssapi_name) - # Exchange tokens until authentication either succeeds or fails - client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') - received_token = None + # Establish security context and negotiate protection level + # For reference RFC 2222, section 7.2.1 try: + # Exchange tokens until authentication either succeeds or fails + client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') + received_token = None while not client_ctx.complete: # calculate an output token from kafka token (or None if first iteration) output_token = client_ctx.step(received_token) + # pass output token to kafka, or send empty response if the security + # context is complete (output token is None in that case) if output_token is None: - continue - - # pass output token to kafka - try: + self._send_bytes_blocking(Int32.encode(0)) + else: msg = output_token size = Int32.encode(len(msg)) self._send_bytes_blocking(size + msg) - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - header = self._recv_bytes_blocking(4) - (token_size,) = struct.unpack('>i', header) - received_token = self._recv_bytes_blocking(token_size) - - except ConnectionError as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - self.close(error=error) - return future.failure(error) + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + header = self._recv_bytes_blocking(4) + (token_size,) = struct.unpack('>i', header) + received_token = self._recv_bytes_blocking(token_size) + + # Process the security layer negotiation token, sent by the server + # once the security context is established. + + # unwraps message containing supported protection levels and msg size + msg = client_ctx.unwrap(received_token).message + # Kafka currently doesn't support integrity or confidentiality security layers, so we + # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed + # by the server + msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0]))) + msg[1:] + # add authorization identity to the response, GSS-wrap and send it + msg = client_ctx.wrap(msg + auth_id, False).message + size = Int32.encode(len(msg)) + self._send_bytes_blocking(size + msg) + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) except Exception as e: return future.failure(e) |