summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhong Pham <pt2pham@users.noreply.github.com>2019-03-22 21:24:29 -0400
committerDana Powers <dana.powers@gmail.com>2019-03-22 18:24:29 -0700
commit8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2 (patch)
tree3e01a6011767468010734d603bcfdf4c64f7afa5
parentd032844ad945b6e99845c40cfe08e026a56d332a (diff)
downloadkafka-python-8e2ed3ebb45f98e71b7c77fdd52472b815bb7ad2.tar.gz
Support SASL OAuthBearer Authentication (#1750)
-rw-r--r--kafka/admin/client.py3
-rw-r--r--kafka/client_async.py5
-rw-r--r--kafka/conn.py60
-rw-r--r--kafka/consumer/group.py5
-rw-r--r--kafka/oauth/__init__.py3
-rw-r--r--kafka/oauth/abstract.py42
-rw-r--r--kafka/producer/kafka.py5
7 files changed, 117 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index d02a68a..39f7e1a 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -133,6 +133,8 @@ class KafkaAdminClient(object):
Default: None
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
+ sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
+ instance. (See kafka.oauth.abstract). Default: None
"""
DEFAULT_CONFIG = {
@@ -166,6 +168,7 @@ class KafkaAdminClient(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
+ 'sasl_oauth_token_provider': None,
# metrics configs
'metric_reporters': [],
diff --git a/kafka/client_async.py b/kafka/client_async.py
index fa150db..ebd4af7 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -151,6 +151,8 @@ class KafkaClient(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
+ sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
+ instance. (See kafka.oauth.abstract). Default: None
"""
DEFAULT_CONFIG = {
@@ -188,7 +190,8 @@ class KafkaClient(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
- 'sasl_kerberos_domain_name': None
+ 'sasl_kerberos_domain_name': None,
+ 'sasl_oauth_token_provider': None
}
def __init__(self, **configs):
diff --git a/kafka/conn.py b/kafka/conn.py
index 4aa94f7..52ed9d6 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -25,6 +25,7 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
+from kafka.oauth.abstract import AbstractTokenProvider
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
@@ -184,6 +185,8 @@ class BrokerConnection(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
+ sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
+ instance. (See kafka.oauth.abstract). Default: None
"""
DEFAULT_CONFIG = {
@@ -216,10 +219,11 @@ class BrokerConnection(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
- 'sasl_kerberos_domain_name': None
+ 'sasl_kerberos_domain_name': None,
+ 'sasl_oauth_token_provider': None
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
- SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
+ SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER')
def __init__(self, host, port, afi, **configs):
self.host = host
@@ -263,7 +267,10 @@ class BrokerConnection(object):
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
-
+ if self.config['sasl_mechanism'] == 'OAUTHBEARER':
+ token_provider = self.config['sasl_oauth_token_provider']
+ assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
+ assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()'
# This is not a general lock / this class is not generally thread-safe yet
# However, to avoid pushing responsibility for maintaining
# per-connection locks to the upstream client, we will use this lock to
@@ -537,6 +544,8 @@ class BrokerConnection(object):
return self._try_authenticate_plain(future)
elif self.config['sasl_mechanism'] == 'GSSAPI':
return self._try_authenticate_gssapi(future)
+ elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
+ return self._try_authenticate_oauth(future)
else:
return future.failure(
Errors.UnsupportedSaslMechanismError(
@@ -660,6 +669,51 @@ class BrokerConnection(object):
log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
return future.success(True)
+ def _try_authenticate_oauth(self, future):
+ data = b''
+
+ msg = bytes(self._build_oauth_client_request().encode("utf-8"))
+ size = Int32.encode(len(msg))
+ try:
+ # Send SASL OAuthBearer request with OAuth token
+ self._send_bytes_blocking(size + msg)
+
+ # The server will send a zero sized message (that is Int32(0)) on success.
+ # The connection is closed on failure
+ data = self._recv_bytes_blocking(4)
+
+ except ConnectionError as e:
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
+ self.close(error=error)
+ return future.failure(error)
+
+ if data != b'\x00\x00\x00\x00':
+ error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
+ return future.failure(error)
+
+ log.info('%s: Authenticated via OAuth', self)
+ return future.success(True)
+
+ def _build_oauth_client_request(self):
+ token_provider = self.config['sasl_oauth_token_provider']
+ return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions())
+
+ def _token_extensions(self):
+ """
+ Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
+ initial request.
+ """
+ token_provider = self.config['sasl_oauth_token_provider']
+
+ # Only run if the #extensions() method is implemented by the clients Token Provider class
+ # Builds up a string separated by \x01 via a dict of key value pairs
+ if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0:
+ msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()])
+ return "\x01" + msg
+ else:
+ return ""
+
def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index c107f5a..4b46e04 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -240,6 +240,8 @@ class KafkaConsumer(six.Iterator):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
+ sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
+ instance. (See kafka.oauth.abstract). Default: None
Note:
Configuration parameters are described in more detail at
@@ -299,7 +301,8 @@ class KafkaConsumer(six.Iterator):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
- 'sasl_kerberos_domain_name': None
+ 'sasl_kerberos_domain_name': None,
+ 'sasl_oauth_token_provider': None
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
diff --git a/kafka/oauth/__init__.py b/kafka/oauth/__init__.py
new file mode 100644
index 0000000..8c83495
--- /dev/null
+++ b/kafka/oauth/__init__.py
@@ -0,0 +1,3 @@
+from __future__ import absolute_import
+
+from kafka.oauth.abstract import AbstractTokenProvider
diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py
new file mode 100644
index 0000000..8d89ff5
--- /dev/null
+++ b/kafka/oauth/abstract.py
@@ -0,0 +1,42 @@
+from __future__ import absolute_import
+
+import abc
+
+# This statement is compatible with both Python 2.7 & 3+
+ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
+
+class AbstractTokenProvider(ABC):
+ """
+ A Token Provider must be used for the SASL OAuthBearer protocol.
+
+ The implementation should ensure token reuse so that multiple
+ calls at connect time do not create multiple tokens. The implementation
+ should also periodically refresh the token in order to guarantee
+ that each call returns an unexpired token. A timeout error should
+ be returned after a short period of inactivity so that the
+ broker can log debugging info and retry.
+
+ Token Providers MUST implement the token() method
+ """
+
+ def __init__(self, **config):
+ pass
+
+ @abc.abstractmethod
+ def token(self):
+ """
+ Returns a (str) ID/Access Token to be sent to the Kafka
+ client.
+ """
+ pass
+
+ def extensions(self):
+ """
+ This is an OPTIONAL method that may be implemented.
+
+ Returns a map of key-value pairs that can
+ be sent with the SASL/OAUTHBEARER initial client request. If
+ not implemented, the values are ignored. This feature is only available
+ in Kafka >= 2.1.0.
+ """
+ return {}
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e4d5929..82df070 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -277,6 +277,8 @@ class KafkaProducer(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
+ sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
+ instance. (See kafka.oauth.abstract). Default: None
Note:
Configuration parameters are described in more detail at
@@ -328,7 +330,8 @@ class KafkaProducer(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
- 'sasl_kerberos_domain_name': None
+ 'sasl_kerberos_domain_name': None,
+ 'sasl_oauth_token_provider': None
}
_COMPRESSORS = {