summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-03 11:46:56 -0700
committerDana Powers <dana.powers@gmail.com>2016-08-03 11:46:56 -0700
commit709ee3b59aff8ab205f0e09c33f4ec8391664228 (patch)
treeca5a3e79e002bdb956e4faf73aeec6874ee8d9de
parentc693709aaf9e292c8614b9ab345d3322d4f71caa (diff)
parent787e8b2ba033cf3d961ca1f5ee345c279222ca8b (diff)
downloadkafka-python-709ee3b59aff8ab205f0e09c33f4ec8391664228.tar.gz
Support for PLAIN sasl authentication (PR #779)
Merge squashed branch 'larsjsol-sasl_plain'
-rw-r--r--kafka/client_async.py10
-rw-r--r--kafka/conn.py110
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/errors.py20
-rw-r--r--kafka/producer/kafka.py13
-rw-r--r--kafka/protocol/admin.py21
6 files changed, 179 insertions, 5 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index dd4df82..6e07ab0 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -70,6 +70,9 @@ class KafkaClient(object):
'selector': selectors.DefaultSelector,
'metrics': None,
'metric_group_prefix': '',
+ 'sasl_mechanism': None,
+ 'sasl_plain_username': None,
+ 'sasl_plain_password': None,
}
API_VERSIONS = [
(0, 10),
@@ -150,6 +153,13 @@ class KafkaClient(object):
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: ''
+ sasl_mechanism (str): string picking sasl mechanism when security_protocol
+ is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
+ Default: None
+ sasl_plain_username (str): username for sasl PLAIN authentication.
+ Default: None
+ sasl_plain_password (str): passowrd for sasl PLAIN authentication.
+ Defualt: None
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
diff --git a/kafka/conn.py b/kafka/conn.py
index 03c445e..05b0acb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -15,6 +15,7 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.api import RequestHeader
+from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -48,7 +49,7 @@ class ConnectionStates(object):
CONNECTING = '<connecting>'
HANDSHAKE = '<handshake>'
CONNECTED = '<connected>'
-
+ AUTHENTICATING = '<authenticating>'
InFlightRequest = collections.namedtuple('InFlightRequest',
['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
@@ -73,7 +74,11 @@ class BrokerConnection(object):
'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
+ 'sasl_mechanism': 'PLAIN',
+ 'sasl_plain_username': None,
+ 'sasl_plain_password': None
}
+ SASL_MECHANISMS = ('PLAIN',)
def __init__(self, host, port, afi, **configs):
self.host = host
@@ -96,11 +101,19 @@ class BrokerConnection(object):
(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes']))
+ if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
+ assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
+ 'sasl_mechanism must be in ' + self.SASL_MECHANISMS)
+ if self.config['sasl_mechanism'] == 'PLAIN':
+ assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
+ assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
+
self.state = ConnectionStates.DISCONNECTED
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
+ self._sasl_auth_future = None
self._rbuffer = io.BytesIO()
self._receiving = False
self._next_payload_bytes = 0
@@ -188,6 +201,8 @@ class BrokerConnection(object):
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', str(self))
self.state = ConnectionStates.HANDSHAKE
+ elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
+ self.state = ConnectionStates.AUTHENTICATING
else:
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
@@ -211,6 +226,16 @@ class BrokerConnection(object):
if self.state is ConnectionStates.HANDSHAKE:
if self._try_handshake():
log.debug('%s: completed SSL handshake.', str(self))
+ if self.config['security_protocol'] == 'SASL_SSL':
+ self.state = ConnectionStates.AUTHENTICATING
+ else:
+ self.state = ConnectionStates.CONNECTED
+ self.config['state_change_callback'](self)
+
+ if self.state is ConnectionStates.AUTHENTICATING:
+ assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
+ if self._try_authenticate():
+ log.info('%s: Authenticated as %s', str(self), self.config['sasl_plain_username'])
self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
@@ -273,6 +298,75 @@ class BrokerConnection(object):
return False
+ def _try_authenticate(self):
+ assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
+
+ if self._sasl_auth_future is None:
+ # Build a SaslHandShakeRequest message
+ request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
+ future = Future()
+ sasl_response = self._send(request)
+ sasl_response.add_callback(self._handle_sasl_handshake_response, future)
+ sasl_response.add_errback(lambda f, e: f.failure(e), future)
+ self._sasl_auth_future = future
+ self._recv()
+ if self._sasl_auth_future.failed():
+ raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
+ return self._sasl_auth_future.succeeded()
+
+ def _handle_sasl_handshake_response(self, future, response):
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ error = error_type(self)
+ self.close(error=error)
+ return future.failure(error_type(self))
+
+ if self.config['sasl_mechanism'] == 'PLAIN':
+ return self._try_authenticate_plain(future)
+ else:
+ return future.failure(
+ Errors.UnsupportedSaslMechanismError(
+ 'kafka-python does not support SASL mechanism %s' %
+ self.config['sasl_mechanism']))
+
+ def _try_authenticate_plain(self, future):
+ if self.config['security_protocol'] == 'SASL_PLAINTEXT':
+ log.warning('%s: Sending username and password in the clear', str(self))
+
+ data = b''
+ try:
+ self._sock.setblocking(True)
+ # Send PLAIN credentials per RFC-4616
+ msg = bytes('\0'.join([self.config['sasl_plain_username'],
+ self.config['sasl_plain_username'],
+ self.config['sasl_plain_password']]).encode('utf-8'))
+ size = Int32.encode(len(msg))
+ self._sock.sendall(size + msg)
+
+ # The server will send a zero sized message (that is Int32(0)) on success.
+ # The connection is closed on failure
+ while len(data) < 4:
+ fragment = self._sock.recv(4 - len(data))
+ if not fragment:
+ log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
+ error = Errors.AuthenticationFailedError(
+ 'Authentication failed for user {0}'.format(
+ self.config['sasl_plain_username']))
+ future.failure(error)
+ raise error
+ data += fragment
+ self._sock.setblocking(False)
+ except (AssertionError, ConnectionError) as e:
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.ConnectionError("%s: %s" % (str(self), e))
+ future.failure(error)
+ self.close(error=error)
+
+ if data != b'\x00\x00\x00\x00':
+ return future.failure(Errors.AuthenticationFailedError())
+
+ return future.success(True)
+
def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
@@ -292,7 +386,8 @@ class BrokerConnection(object):
"""Returns True if still connecting (this may encompass several
different states, such as SSL handshake, authorization, etc)."""
return self.state in (ConnectionStates.CONNECTING,
- ConnectionStates.HANDSHAKE)
+ ConnectionStates.HANDSHAKE,
+ ConnectionStates.AUTHENTICATING)
def disconnected(self):
"""Return True iff socket is closed"""
@@ -337,6 +432,10 @@ class BrokerConnection(object):
return future.failure(Errors.ConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
+ return self._send(request, expect_response=expect_response)
+
+ def _send(self, request, expect_response=True):
+ future = Future()
correlation_id = self._next_correlation_id()
header = RequestHeader(request,
correlation_id=correlation_id,
@@ -385,7 +484,7 @@ class BrokerConnection(object):
Return response if available
"""
assert not self._processing, 'Recursion not supported'
- if not self.connected():
+ if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
log.warning('%s cannot recv: socket not connected', self)
# If requests are pending, we should close the socket and
# fail all the pending request futures
@@ -405,6 +504,9 @@ class BrokerConnection(object):
self.config['request_timeout_ms']))
return None
+ return self._recv()
+
+ def _recv(self):
# Not receiving is the state of reading the payload header
if not self._receiving:
try:
@@ -452,7 +554,7 @@ class BrokerConnection(object):
# enough data to read the full bytes_to_read
# but if the socket is disconnected, we will get empty data
# without an exception raised
- if not data:
+ if bytes_to_read and not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
return None
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index ed12ec0..489d96d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -186,6 +186,13 @@ class KafkaConsumer(six.Iterator):
(such as offsets) should be exposed to the consumer. If set to True
the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+ Default: True
+ sasl_mechanism (str): string picking sasl mechanism when security_protocol
+ is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
+ Default: None
+ sasl_plain_username (str): username for sasl PLAIN authentication.
+ Default: None
+ sasl_plain_password (str): passowrd for sasl PLAIN authentication.
+ Defualt: None
Note:
Configuration parameters are described in more detail at
@@ -234,6 +241,9 @@ class KafkaConsumer(six.Iterator):
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
+ 'sasl_mechanism': None,
+ 'sasl_plain_username': None,
+ 'sasl_plain_password': None,
}
def __init__(self, *topics, **configs):
diff --git a/kafka/errors.py b/kafka/errors.py
index c005bf8..069c9e4 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -58,6 +58,14 @@ class CommitFailedError(KafkaError):
pass
+class AuthenticationMethodNotSupported(KafkaError):
+ pass
+
+
+class AuthenticationFailedError(KafkaError):
+ retriable = False
+
+
class BrokerResponseError(KafkaError):
errno = None
message = None
@@ -328,6 +336,18 @@ class InvalidTimestampError(BrokerResponseError):
description = ('The timestamp of the message is out of acceptable range.')
+class UnsupportedSaslMechanismError(BrokerResponseError):
+ errno = 33
+ message = 'UNSUPPORTED_SASL_MECHANISM'
+ description = ('The broker does not support the requested SASL mechanism.')
+
+
+class IllegalSaslStateError(BrokerResponseError):
+ errno = 34
+ message = 'ILLEGAL_SASL_STATE'
+ description = ('Request is not valid given the current SASL state.')
+
+
class KafkaUnavailableError(KafkaError):
pass
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 381ad74..aef50d0 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -199,7 +199,8 @@ class KafkaProducer(object):
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
security_protocol (str): Protocol used to communicate with brokers.
- Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
+ Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
@@ -235,6 +236,13 @@ class KafkaProducer(object):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
+ sasl_mechanism (str): string picking sasl mechanism when security_protocol
+ is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
+ Default: None
+ sasl_plain_username (str): username for sasl PLAIN authentication.
+ Default: None
+ sasl_plain_password (str): passowrd for sasl PLAIN authentication.
+ Defualt: None
Note:
Configuration parameters are described in more detail at
@@ -276,6 +284,9 @@ class KafkaProducer(object):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
+ 'sasl_mechanism': None,
+ 'sasl_plain_username': None,
+ 'sasl_plain_password': None,
}
def __init__(self, **configs):
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index 12181d7..747684f 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -78,3 +78,24 @@ class DescribeGroupsRequest_v0(Struct):
DescribeGroupsRequest = [DescribeGroupsRequest_v0]
DescribeGroupsResponse = [DescribeGroupsResponse_v0]
+
+
+class SaslHandShakeResponse_v0(Struct):
+ API_KEY = 17
+ API_VERSION = 0
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('enabled_mechanisms', Array(String('utf-8')))
+ )
+
+
+class SaslHandShakeRequest_v0(Struct):
+ API_KEY = 17
+ API_VERSION = 0
+ RESPONSE_TYPE = SaslHandShakeResponse_v0
+ SCHEMA = Schema(
+ ('mechanism', String('utf-8'))
+ )
+
+SaslHandShakeRequest = [SaslHandShakeRequest_v0]
+SaslHandShakeResponse = [SaslHandShakeResponse_v0]