diff options
author | Phong Pham <pt2pham@users.noreply.github.com> | 2019-03-22 21:24:29 -0400 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-22 18:24:29 -0700 |
commit | 8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2 (patch) | |
tree | 3e01a6011767468010734d603bcfdf4c64f7afa5 | |
parent | d032844ad945b6e99845c40cfe08e026a56d332a (diff) | |
download | kafka-python-8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2.tar.gz |
Support SASL OAuthBearer Authentication (#1750)
-rw-r--r-- | kafka/admin/client.py | 3 | ||||
-rw-r--r-- | kafka/client_async.py | 5 | ||||
-rw-r--r-- | kafka/conn.py | 60 | ||||
-rw-r--r-- | kafka/consumer/group.py | 5 | ||||
-rw-r--r-- | kafka/oauth/__init__.py | 3 | ||||
-rw-r--r-- | kafka/oauth/abstract.py | 42 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 5 |
7 files changed, 117 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index d02a68a..39f7e1a 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -133,6 +133,8 @@ class KafkaAdminClient(object): Default: None sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { @@ -166,6 +168,7 @@ class KafkaAdminClient(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', + 'sasl_oauth_token_provider': None, # metrics configs 'metric_reporters': [], diff --git a/kafka/client_async.py b/kafka/client_async.py index fa150db..ebd4af7 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -151,6 +151,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { @@ -188,7 +190,8 @@ class KafkaClient(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } def __init__(self, **configs): diff --git a/kafka/conn.py b/kafka/conn.py index 4aa94f7..52ed9d6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -25,6 +25,7 @@ from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate +from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest @@ -184,6 +185,8 @@ class BrokerConnection(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { @@ -216,10 +219,11 @@ class BrokerConnection(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI') + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') def __init__(self, host, port, afi, **configs): self.host = host @@ -263,7 +267,10 @@ class BrokerConnection(object): if self.config['sasl_mechanism'] == 'GSSAPI': assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' - + if self.config['sasl_mechanism'] == 'OAUTHBEARER': + token_provider = self.config['sasl_oauth_token_provider'] + assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' + assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -537,6 +544,8 @@ class BrokerConnection(object): return self._try_authenticate_plain(future) elif self.config['sasl_mechanism'] == 'GSSAPI': return self._try_authenticate_gssapi(future) + elif self.config['sasl_mechanism'] == 'OAUTHBEARER': + return self._try_authenticate_oauth(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -660,6 +669,51 @@ class BrokerConnection(object): log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) return future.success(True) + def _try_authenticate_oauth(self, future): + data = b'' + + msg = bytes(self._build_oauth_client_request().encode("utf-8")) + size = Int32.encode(len(msg)) + try: + # Send SASL OAuthBearer request with OAuth token + self._send_bytes_blocking(size + msg) + + # The server will send a zero sized message (that is Int32(0)) on success. + # The connection is closed on failure + data = self._recv_bytes_blocking(4) + + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) + + if data != b'\x00\x00\x00\x00': + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + return future.failure(error) + + log.info('%s: Authenticated via OAuth', self) + return future.success(True) + + def _build_oauth_client_request(self): + token_provider = self.config['sasl_oauth_token_provider'] + return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) + + def _token_extensions(self): + """ + Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER + initial request. + """ + token_provider = self.config['sasl_oauth_token_provider'] + + # Only run if the #extensions() method is implemented by the clients Token Provider class + # Builds up a string separated by \x01 via a dict of key value pairs + if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0: + msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) + return "\x01" + msg + else: + return "" + def blacked_out(self): """ Return true if we are disconnected from the given node and can't diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index c107f5a..4b46e04 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -240,6 +240,8 @@ class KafkaConsumer(six.Iterator): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None Note: Configuration parameters are described in more detail at @@ -299,7 +301,8 @@ class KafkaConsumer(six.Iterator): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 diff --git a/kafka/oauth/__init__.py b/kafka/oauth/__init__.py new file mode 100644 index 0000000..8c83495 --- /dev/null +++ b/kafka/oauth/__init__.py @@ -0,0 +1,3 @@ +from __future__ import absolute_import + +from kafka.oauth.abstract import AbstractTokenProvider diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py new file mode 100644 index 0000000..8d89ff5 --- /dev/null +++ b/kafka/oauth/abstract.py @@ -0,0 +1,42 @@ +from __future__ import absolute_import + +import abc + +# This statement is compatible with both Python 2.7 & 3+ +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + +class AbstractTokenProvider(ABC): + """ + A Token Provider must be used for the SASL OAuthBearer protocol. + + The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + + Token Providers MUST implement the token() method + """ + + def __init__(self, **config): + pass + + @abc.abstractmethod + def token(self): + """ + Returns a (str) ID/Access Token to be sent to the Kafka + client. + """ + pass + + def extensions(self): + """ + This is an OPTIONAL method that may be implemented. + + Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not implemented, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + """ + return {} diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e4d5929..82df070 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -277,6 +277,8 @@ class KafkaProducer(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None Note: Configuration parameters are described in more detail at @@ -328,7 +330,8 @@ class KafkaProducer(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } _COMPRESSORS = { |