diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-08-03 11:46:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-08-03 11:46:56 -0700 |
commit | 709ee3b59aff8ab205f0e09c33f4ec8391664228 (patch) | |
tree | ca5a3e79e002bdb956e4faf73aeec6874ee8d9de | |
parent | c693709aaf9e292c8614b9ab345d3322d4f71caa (diff) | |
parent | 787e8b2ba033cf3d961ca1f5ee345c279222ca8b (diff) | |
download | kafka-python-709ee3b59aff8ab205f0e09c33f4ec8391664228.tar.gz |
Support for PLAIN sasl authentication (PR #779)
Merge squashed branch 'larsjsol-sasl_plain'
-rw-r--r-- | kafka/client_async.py | 10 | ||||
-rw-r--r-- | kafka/conn.py | 110 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/errors.py | 20 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 13 | ||||
-rw-r--r-- | kafka/protocol/admin.py | 21 |
6 files changed, 179 insertions, 5 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index dd4df82..6e07ab0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -70,6 +70,9 @@ class KafkaClient(object): 'selector': selectors.DefaultSelector, 'metrics': None, 'metric_group_prefix': '', + 'sasl_mechanism': None, + 'sasl_plain_username': None, + 'sasl_plain_password': None, } API_VERSIONS = [ (0, 10), @@ -150,6 +153,13 @@ class KafkaClient(object): metrics (kafka.metrics.Metrics): Optionally provide a metrics instance for capturing network IO stats. Default: None. metric_group_prefix (str): Prefix for metric names. Default: '' + sasl_mechanism (str): string picking sasl mechanism when security_protocol + is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. + Default: None + sasl_plain_username (str): username for sasl PLAIN authentication. + Default: None + sasl_plain_password (str): passowrd for sasl PLAIN authentication. + Defualt: None """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: diff --git a/kafka/conn.py b/kafka/conn.py index 03c445e..05b0acb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -15,6 +15,7 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future from kafka.protocol.api import RequestHeader +from kafka.protocol.admin import SaslHandShakeRequest, SaslHandShakeResponse from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -48,7 +49,7 @@ class ConnectionStates(object): CONNECTING = '<connecting>' HANDSHAKE = '<handshake>' CONNECTED = '<connected>' - + AUTHENTICATING = '<authenticating>' InFlightRequest = collections.namedtuple('InFlightRequest', ['request', 'response_type', 'correlation_id', 'future', 'timestamp']) @@ -73,7 +74,11 @@ class BrokerConnection(object): 'ssl_password': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, + 'sasl_mechanism': 'PLAIN', + 'sasl_plain_username': None, + 'sasl_plain_password': None } + SASL_MECHANISMS = ('PLAIN',) def __init__(self, host, port, afi, **configs): self.host = host @@ -96,11 +101,19 @@ class BrokerConnection(object): (socket.SOL_SOCKET, socket.SO_SNDBUF, self.config['send_buffer_bytes'])) + if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): + assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, ( + 'sasl_mechanism must be in ' + self.SASL_MECHANISMS) + if self.config['sasl_mechanism'] == 'PLAIN': + assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' + assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' + self.state = ConnectionStates.DISCONNECTED self._sock = None self._ssl_context = None if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] + self._sasl_auth_future = None self._rbuffer = io.BytesIO() self._receiving = False self._next_payload_bytes = 0 @@ -188,6 +201,8 @@ class BrokerConnection(object): if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): log.debug('%s: initiating SSL handshake', str(self)) self.state = ConnectionStates.HANDSHAKE + elif self.config['security_protocol'] == 'SASL_PLAINTEXT': + self.state = ConnectionStates.AUTHENTICATING else: self.state = ConnectionStates.CONNECTED self.config['state_change_callback'](self) @@ -211,6 +226,16 @@ class BrokerConnection(object): if self.state is ConnectionStates.HANDSHAKE: if self._try_handshake(): log.debug('%s: completed SSL handshake.', str(self)) + if self.config['security_protocol'] == 'SASL_SSL': + self.state = ConnectionStates.AUTHENTICATING + else: + self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) + + if self.state is ConnectionStates.AUTHENTICATING: + assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') + if self._try_authenticate(): + log.info('%s: Authenticated as %s', str(self), self.config['sasl_plain_username']) self.state = ConnectionStates.CONNECTED self.config['state_change_callback'](self) @@ -273,6 +298,75 @@ class BrokerConnection(object): return False + def _try_authenticate(self): + assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10) + + if self._sasl_auth_future is None: + # Build a SaslHandShakeRequest message + request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) + future = Future() + sasl_response = self._send(request) + sasl_response.add_callback(self._handle_sasl_handshake_response, future) + sasl_response.add_errback(lambda f, e: f.failure(e), future) + self._sasl_auth_future = future + self._recv() + if self._sasl_auth_future.failed(): + raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type + return self._sasl_auth_future.succeeded() + + def _handle_sasl_handshake_response(self, future, response): + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + error = error_type(self) + self.close(error=error) + return future.failure(error_type(self)) + + if self.config['sasl_mechanism'] == 'PLAIN': + return self._try_authenticate_plain(future) + else: + return future.failure( + Errors.UnsupportedSaslMechanismError( + 'kafka-python does not support SASL mechanism %s' % + self.config['sasl_mechanism'])) + + def _try_authenticate_plain(self, future): + if self.config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('%s: Sending username and password in the clear', str(self)) + + data = b'' + try: + self._sock.setblocking(True) + # Send PLAIN credentials per RFC-4616 + msg = bytes('\0'.join([self.config['sasl_plain_username'], + self.config['sasl_plain_username'], + self.config['sasl_plain_password']]).encode('utf-8')) + size = Int32.encode(len(msg)) + self._sock.sendall(size + msg) + + # The server will send a zero sized message (that is Int32(0)) on success. + # The connection is closed on failure + while len(data) < 4: + fragment = self._sock.recv(4 - len(data)) + if not fragment: + log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) + error = Errors.AuthenticationFailedError( + 'Authentication failed for user {0}'.format( + self.config['sasl_plain_username'])) + future.failure(error) + raise error + data += fragment + self._sock.setblocking(False) + except (AssertionError, ConnectionError) as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (str(self), e)) + future.failure(error) + self.close(error=error) + + if data != b'\x00\x00\x00\x00': + return future.failure(Errors.AuthenticationFailedError()) + + return future.success(True) + def blacked_out(self): """ Return true if we are disconnected from the given node and can't @@ -292,7 +386,8 @@ class BrokerConnection(object): """Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).""" return self.state in (ConnectionStates.CONNECTING, - ConnectionStates.HANDSHAKE) + ConnectionStates.HANDSHAKE, + ConnectionStates.AUTHENTICATING) def disconnected(self): """Return True iff socket is closed""" @@ -337,6 +432,10 @@ class BrokerConnection(object): return future.failure(Errors.ConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) + return self._send(request, expect_response=expect_response) + + def _send(self, request, expect_response=True): + future = Future() correlation_id = self._next_correlation_id() header = RequestHeader(request, correlation_id=correlation_id, @@ -385,7 +484,7 @@ class BrokerConnection(object): Return response if available """ assert not self._processing, 'Recursion not supported' - if not self.connected(): + if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) # If requests are pending, we should close the socket and # fail all the pending request futures @@ -405,6 +504,9 @@ class BrokerConnection(object): self.config['request_timeout_ms'])) return None + return self._recv() + + def _recv(self): # Not receiving is the state of reading the payload header if not self._receiving: try: @@ -452,7 +554,7 @@ class BrokerConnection(object): # enough data to read the full bytes_to_read # but if the socket is disconnected, we will get empty data # without an exception raised - if not data: + if bytes_to_read and not data: log.error('%s: socket disconnected', self) self.close(error=Errors.ConnectionError('socket disconnected')) return None diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ed12ec0..489d96d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -186,6 +186,13 @@ class KafkaConsumer(six.Iterator): (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+ Default: True + sasl_mechanism (str): string picking sasl mechanism when security_protocol + is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. + Default: None + sasl_plain_username (str): username for sasl PLAIN authentication. + Default: None + sasl_plain_password (str): passowrd for sasl PLAIN authentication. + Defualt: None Note: Configuration parameters are described in more detail at @@ -234,6 +241,9 @@ class KafkaConsumer(six.Iterator): 'metrics_sample_window_ms': 30000, 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, + 'sasl_mechanism': None, + 'sasl_plain_username': None, + 'sasl_plain_password': None, } def __init__(self, *topics, **configs): diff --git a/kafka/errors.py b/kafka/errors.py index c005bf8..069c9e4 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -58,6 +58,14 @@ class CommitFailedError(KafkaError): pass +class AuthenticationMethodNotSupported(KafkaError): + pass + + +class AuthenticationFailedError(KafkaError): + retriable = False + + class BrokerResponseError(KafkaError): errno = None message = None @@ -328,6 +336,18 @@ class InvalidTimestampError(BrokerResponseError): description = ('The timestamp of the message is out of acceptable range.') +class UnsupportedSaslMechanismError(BrokerResponseError): + errno = 33 + message = 'UNSUPPORTED_SASL_MECHANISM' + description = ('The broker does not support the requested SASL mechanism.') + + +class IllegalSaslStateError(BrokerResponseError): + errno = 34 + message = 'ILLEGAL_SASL_STATE' + description = ('Request is not valid given the current SASL state.') + + class KafkaUnavailableError(KafkaError): pass diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 381ad74..aef50d0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -199,7 +199,8 @@ class KafkaProducer(object): to kafka brokers up to this number of maximum requests per broker connection. Default: 5. security_protocol (str): Protocol used to communicate with brokers. - Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. + Default: PLAINTEXT. ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None. @@ -235,6 +236,13 @@ class KafkaProducer(object): selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector + sasl_mechanism (str): string picking sasl mechanism when security_protocol + is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. + Default: None + sasl_plain_username (str): username for sasl PLAIN authentication. + Default: None + sasl_plain_password (str): passowrd for sasl PLAIN authentication. + Defualt: None Note: Configuration parameters are described in more detail at @@ -276,6 +284,9 @@ class KafkaProducer(object): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'selector': selectors.DefaultSelector, + 'sasl_mechanism': None, + 'sasl_plain_username': None, + 'sasl_plain_password': None, } def __init__(self, **configs): diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 12181d7..747684f 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -78,3 +78,24 @@ class DescribeGroupsRequest_v0(Struct): DescribeGroupsRequest = [DescribeGroupsRequest_v0] DescribeGroupsResponse = [DescribeGroupsResponse_v0] + + +class SaslHandShakeResponse_v0(Struct): + API_KEY = 17 + API_VERSION = 0 + SCHEMA = Schema( + ('error_code', Int16), + ('enabled_mechanisms', Array(String('utf-8'))) + ) + + +class SaslHandShakeRequest_v0(Struct): + API_KEY = 17 + API_VERSION = 0 + RESPONSE_TYPE = SaslHandShakeResponse_v0 + SCHEMA = Schema( + ('mechanism', String('utf-8')) + ) + +SaslHandShakeRequest = [SaslHandShakeRequest_v0] +SaslHandShakeResponse = [SaslHandShakeResponse_v0] |