summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndre Araujo <araujo@cloudera.com>2017-10-25 14:04:59 -0700
committerDana Powers <dana.powers@gmail.com>2017-12-26 09:49:05 -0800
commit4cfeaca5c867e15213420caad400f5f1863f64e3 (patch)
tree5ae006e83afa93046fd72106f072b16f84a337ab
parentc49ae90b105fad958dbc60499aeedd27ff52416c (diff)
downloadkafka-python-4cfeaca5c867e15213420caad400f5f1863f64e3.tar.gz
Add security layer negotiation to the GSSAPI authentication. (#1283)
When trying to establish a connection with Kafka using SASL with the GSSAPI authentication mechanism the connection was hanging an timing out after 60 secons. On the Kafka broker side I noticed that the SaslServerAuthenticator was going from the AUTHENTICATE to the FAILED state. The GSSAPI auth implementation was missing the second handshake defined in RFC 2222, which happens after the security context is established. This handshake is used by the client and server to negotiate the security layer (QoP) to be used for the connection. Kafka currently only support the "auth" QoP, so the implementation in this commit doesn't make it configurable, but this can be extended later. With this change I was able to successfully connect to a Kerberos-enabled Kafka broker using the SASL_PLAINTEXT protocol and the GSSAPI mechanism.
-rw-r--r--kafka/conn.py65
1 files changed, 43 insertions, 22 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b1008b..246cab8 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import, division
import collections
import copy
import errno
+import io
import logging
from random import shuffle, uniform
@@ -27,7 +28,7 @@ from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.parser import KafkaProtocol
-from kafka.protocol.types import Int32
+from kafka.protocol.types import Int32, Int8
from kafka.version import __version__
@@ -39,6 +40,10 @@ log = logging.getLogger(__name__)
DEFAULT_KAFKA_PORT = 9092
+SASL_QOP_AUTH = 1
+SASL_QOP_AUTH_INT = 2
+SASL_QOP_AUTH_CONF = 4
+
try:
import ssl
ssl_available = True
@@ -517,43 +522,59 @@ class BrokerConnection(object):
return future.success(True)
def _try_authenticate_gssapi(self, future):
+ auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
gssapi_name = gssapi.Name(
- self.config['sasl_kerberos_service_name'] + '@' + self.hostname,
+ auth_id,
name_type=gssapi.NameType.hostbased_service
).canonicalize(gssapi.MechType.kerberos)
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
- # Exchange tokens until authentication either succeeds or fails
- client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
- received_token = None
+ # Establish security context and negotiate protection level
+ # For reference RFC 2222, section 7.2.1
try:
+ # Exchange tokens until authentication either succeeds or fails
+ client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
+ received_token = None
while not client_ctx.complete:
# calculate an output token from kafka token (or None if first iteration)
output_token = client_ctx.step(received_token)
+ # pass output token to kafka, or send empty response if the security
+ # context is complete (output token is None in that case)
if output_token is None:
- continue
-
- # pass output token to kafka
- try:
+ self._send_bytes_blocking(Int32.encode(0))
+ else:
msg = output_token
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)
- # The server will send a token back. Processing of this token either
- # establishes a security context, or it needs further token exchange.
- # The gssapi will be able to identify the needed next step.
- # The connection is closed on failure.
- header = self._recv_bytes_blocking(4)
- (token_size,) = struct.unpack('>i', header)
- received_token = self._recv_bytes_blocking(token_size)
-
- except ConnectionError as e:
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
- self.close(error=error)
- return future.failure(error)
+ # The server will send a token back. Processing of this token either
+ # establishes a security context, or it needs further token exchange.
+ # The gssapi will be able to identify the needed next step.
+ # The connection is closed on failure.
+ header = self._recv_bytes_blocking(4)
+ (token_size,) = struct.unpack('>i', header)
+ received_token = self._recv_bytes_blocking(token_size)
+
+ # Process the security layer negotiation token, sent by the server
+ # once the security context is established.
+
+ # unwraps message containing supported protection levels and msg size
+ msg = client_ctx.unwrap(received_token).message
+ # Kafka currently doesn't support integrity or confidentiality security layers, so we
+ # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
+ # by the server
+ msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0]))) + msg[1:]
+ # add authorization identity to the response, GSS-wrap and send it
+ msg = client_ctx.wrap(msg + auth_id, False).message
+ size = Int32.encode(len(msg))
+ self._send_bytes_blocking(size + msg)
+ except ConnectionError as e:
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.ConnectionError("%s: %s" % (self, e))
+ self.close(error=error)
+ return future.failure(error)
except Exception as e:
return future.failure(e)