summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py65
1 files changed, 43 insertions, 22 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b1008b..246cab8 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import, division
import collections
import copy
import errno
+import io
import logging
from random import shuffle, uniform
@@ -27,7 +28,7 @@ from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.parser import KafkaProtocol
-from kafka.protocol.types import Int32
+from kafka.protocol.types import Int32, Int8
from kafka.version import __version__
@@ -39,6 +40,10 @@ log = logging.getLogger(__name__)
DEFAULT_KAFKA_PORT = 9092
+SASL_QOP_AUTH = 1
+SASL_QOP_AUTH_INT = 2
+SASL_QOP_AUTH_CONF = 4
+
try:
import ssl
ssl_available = True
@@ -517,43 +522,59 @@ class BrokerConnection(object):
return future.success(True)
def _try_authenticate_gssapi(self, future):
+ auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
gssapi_name = gssapi.Name(
- self.config['sasl_kerberos_service_name'] + '@' + self.hostname,
+ auth_id,
name_type=gssapi.NameType.hostbased_service
).canonicalize(gssapi.MechType.kerberos)
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
- # Exchange tokens until authentication either succeeds or fails
- client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
- received_token = None
+ # Establish security context and negotiate protection level
+ # For reference RFC 2222, section 7.2.1
try:
+ # Exchange tokens until authentication either succeeds or fails
+ client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
+ received_token = None
while not client_ctx.complete:
# calculate an output token from kafka token (or None if first iteration)
output_token = client_ctx.step(received_token)
+ # pass output token to kafka, or send empty response if the security
+ # context is complete (output token is None in that case)
if output_token is None:
- continue
-
- # pass output token to kafka
- try:
+ self._send_bytes_blocking(Int32.encode(0))
+ else:
msg = output_token
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)
- # The server will send a token back. Processing of this token either
- # establishes a security context, or it needs further token exchange.
- # The gssapi will be able to identify the needed next step.
- # The connection is closed on failure.
- header = self._recv_bytes_blocking(4)
- (token_size,) = struct.unpack('>i', header)
- received_token = self._recv_bytes_blocking(token_size)
-
- except ConnectionError as e:
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
- self.close(error=error)
- return future.failure(error)
+ # The server will send a token back. Processing of this token either
+ # establishes a security context, or it needs further token exchange.
+ # The gssapi will be able to identify the needed next step.
+ # The connection is closed on failure.
+ header = self._recv_bytes_blocking(4)
+ (token_size,) = struct.unpack('>i', header)
+ received_token = self._recv_bytes_blocking(token_size)
+
+ # Process the security layer negotiation token, sent by the server
+ # once the security context is established.
+
+ # unwraps message containing supported protection levels and msg size
+ msg = client_ctx.unwrap(received_token).message
+ # Kafka currently doesn't support integrity or confidentiality security layers, so we
+ # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
+ # by the server
+ msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0]))) + msg[1:]
+ # add authorization identity to the response, GSS-wrap and send it
+ msg = client_ctx.wrap(msg + auth_id, False).message
+ size = Int32.encode(len(msg))
+ self._send_bytes_blocking(size + msg)
+ except ConnectionError as e:
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.ConnectionError("%s: %s" % (self, e))
+ self.close(error=error)
+ return future.failure(error)
except Exception as e:
return future.failure(e)