diff options
author | Swen Wenzel <5111028+swenzel@users.noreply.github.com> | 2019-12-30 00:12:30 +0100 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-12-29 15:12:30 -0800 |
commit | ee1c4a42ef3c7f0aa7c98f0c48b6ab0ae76d77da (patch) | |
tree | e536d4854bf88b0d24b7b2a2f5ee3d731eda9716 | |
parent | 31f846c782b9dc6f2107340d269a7558e99bdfe2 (diff) | |
download | kafka-python-ee1c4a42ef3c7f0aa7c98f0c48b6ab0ae76d77da.tar.gz |
Enable SCRAM-SHA-256 and SCRAM-SHA-512 for sasl (#1918)
57 files changed, 619 insertions, 136 deletions
diff --git a/.travis.yml b/.travis.yml index 4023972..a245650 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,7 @@ addons: cache: directories: - $HOME/.cache/pip - - servers/ + - servers/dist before_install: - source travis_java_install.sh diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 4e4e842..8afe95b 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -131,11 +131,11 @@ class KafkaAdminClient(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI diff --git a/kafka/client_async.py b/kafka/client_async.py index 4630b90..5379153 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -144,11 +144,11 @@ class KafkaClient(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI @@ -768,10 +768,7 @@ class KafkaClient(object): inflight = curr_inflight found = node_id - if found is not None: - return found - - return None + return found def set_topics(self, topics): """Set specific topics to track for metadata. diff --git a/kafka/conn.py b/kafka/conn.py index d4c5464..e4938c7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,12 +1,16 @@ from __future__ import absolute_import, division -import collections +import base64 import copy import errno +import hashlib +import hmac import io import logging from random import shuffle, uniform +from uuid import uuid4 + # selectors in stdlib as of py3.4 try: import selectors # pylint: disable=import-error @@ -16,7 +20,6 @@ except ImportError: import socket import struct -import sys import threading import time @@ -39,6 +42,12 @@ if six.PY2: TimeoutError = socket.error BlockingIOError = Exception + def xor_bytes(left, right): + return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right)) +else: + def xor_bytes(left, right): + return bytes(lb ^ rb for lb, rb in zip(left, right)) + log = logging.getLogger(__name__) DEFAULT_KAFKA_PORT = 9092 @@ -98,6 +107,69 @@ class ConnectionStates(object): AUTHENTICATING = '<authenticating>' +class ScramClient: + MECHANISMS = { + 'SCRAM-SHA-256': hashlib.sha256, + 'SCRAM-SHA-512': hashlib.sha512 + } + + def __init__(self, user, password, mechanism): + self.nonce = str(uuid4()).replace('-', '') + self.auth_message = '' + self.salted_password = None + self.user = user + self.password = password.encode() + self.hashfunc = self.MECHANISMS[mechanism] + self.hashname = ''.join(mechanism.lower().split('-')[1:3]) + self.stored_key = None + self.client_key = None + self.client_signature = None + self.client_proof = None + self.server_key = None + self.server_signature = None + + def first_message(self): + client_first_bare = 'n={},r={}'.format(self.user, self.nonce) + self.auth_message += client_first_bare + return 'n,,' + client_first_bare + + def process_server_first_message(self, server_first_message): + self.auth_message += ',' + server_first_message + params = dict(pair.split('=', 1) for pair in server_first_message.split(',')) + server_nonce = params['r'] + if not server_nonce.startswith(self.nonce): + raise ValueError("Server nonce, did not start with client nonce!") + self.nonce = server_nonce + self.auth_message += ',c=biws,r=' + self.nonce + + salt = base64.b64decode(params['s'].encode()) + iterations = int(params['i']) + self.create_salted_password(salt, iterations) + + self.client_key = self.hmac(self.salted_password, b'Client Key') + self.stored_key = self.hashfunc(self.client_key).digest() + self.client_signature = self.hmac(self.stored_key, self.auth_message.encode()) + self.client_proof = xor_bytes(self.client_key, self.client_signature) + self.server_key = self.hmac(self.salted_password, b'Server Key') + self.server_signature = self.hmac(self.server_key, self.auth_message.encode()) + + def hmac(self, key, msg): + return hmac.new(key, msg, digestmod=self.hashfunc).digest() + + def create_salted_password(self, salt, iterations): + self.salted_password = hashlib.pbkdf2_hmac( + self.hashname, self.password, salt, iterations + ) + + def final_message(self): + client_final_no_proof = 'c=biws,r=' + self.nonce + return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode()) + + def process_server_final_message(self, server_final_message): + params = dict(pair.split('=', 1) for pair in server_final_message.split(',')) + if self.server_signature != base64.b64decode(params['v'].encode()): + raise ValueError("Server sent wrong signature!") + class BrokerConnection(object): """Initialize a Kafka broker connection @@ -178,11 +250,11 @@ class BrokerConnection(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI @@ -225,7 +297,7 @@ class BrokerConnection(object): 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") def __init__(self, host, port, afi, **configs): self.host = host @@ -260,9 +332,13 @@ class BrokerConnection(object): if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, ( 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS)) - if self.config['sasl_mechanism'] == 'PLAIN': - assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' - assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' + if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): + assert self.config['sasl_plain_username'] is not None, ( + 'sasl_plain_username required for PLAIN or SCRAM sasl' + ) + assert self.config['sasl_plain_password'] is not None, ( + 'sasl_plain_password required for PLAIN or SCRAM sasl' + ) if self.config['sasl_mechanism'] == 'GSSAPI': assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' @@ -553,6 +629,8 @@ class BrokerConnection(object): return self._try_authenticate_gssapi(future) elif self.config['sasl_mechanism'] == 'OAUTHBEARER': return self._try_authenticate_oauth(future) + elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): + return self._try_authenticate_scram(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -653,6 +731,53 @@ class BrokerConnection(object): log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) + def _try_authenticate_scram(self, future): + if self.config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('%s: Exchanging credentials in the clear', self) + + scram_client = ScramClient( + self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism'] + ) + + err = None + close = False + with self._lock: + if not self._can_send_recv(): + err = Errors.NodeNotReadyError(str(self)) + close = False + else: + try: + client_first = scram_client.first_message().encode() + size = Int32.encode(len(client_first)) + self._send_bytes_blocking(size + client_first) + + (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) + server_first = self._recv_bytes_blocking(data_len).decode() + scram_client.process_server_first_message(server_first) + + client_final = scram_client.final_message().encode() + size = Int32.encode(len(client_final)) + self._send_bytes_blocking(size + client_final) + + (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) + server_final = self._recv_bytes_blocking(data_len).decode() + scram_client.process_server_final_message(server_final) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) + close = True + + if err is not None: + if close: + self.close(error=err) + return future.failure(err) + + log.info( + '%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism'] + ) + return future.success(True) + def _try_authenticate_gssapi(self, future): kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index cde956c..8474b7c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -232,11 +232,11 @@ class KafkaConsumer(six.Iterator): subscribing to it. Requires 0.10+ Default: True sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): Username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): Password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dc383d6..b90ca88 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -269,11 +269,11 @@ class KafkaProducer(object): Default: selectors.DefaultSelector sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI diff --git a/requirements-dev.txt b/requirements-dev.txt index cb0bbe5..d283090 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -8,6 +8,7 @@ lz4==2.1.2 xxhash==1.3.0 python-snappy==0.5.3 tox==3.5.3 +mock==3.0.5 pylint==1.9.3 pytest-pylint==0.12.3 pytest-mock==1.10.0 diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties index 7d8e2b1..daab312 100644 --- a/servers/0.10.0.0/resources/kafka.properties +++ b/servers/0.10.0.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar @@ -121,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.0.0/resources/kafka_server_jaas.conf b/servers/0.10.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.10.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties index 7d8e2b1..daab312 100644 --- a/servers/0.10.0.1/resources/kafka.properties +++ b/servers/0.10.0.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar @@ -121,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.0.1/resources/kafka_server_jaas.conf b/servers/0.10.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.10.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties index 7d8e2b1..daab312 100644 --- a/servers/0.10.1.1/resources/kafka.properties +++ b/servers/0.10.1.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar @@ -121,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.1.1/resources/kafka_server_jaas.conf b/servers/0.10.1.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.10.1.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties index 7d8e2b1..daab312 100644 --- a/servers/0.10.2.1/resources/kafka.properties +++ b/servers/0.10.2.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar @@ -121,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.2.1/resources/kafka_server_jaas.conf b/servers/0.10.2.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.10.2.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.10.2.2/resources/kafka.properties b/servers/0.10.2.2/resources/kafka.properties index 7d8e2b1..daab312 100644 --- a/servers/0.10.2.2/resources/kafka.properties +++ b/servers/0.10.2.2/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar @@ -121,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.2.2/resources/kafka_server_jaas.conf b/servers/0.10.2.2/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.10.2.2/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/0.11.0.0/resources/kafka.properties +++ b/servers/0.11.0.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/0.11.0.0/resources/kafka_server_jaas.conf b/servers/0.11.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.11.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/0.11.0.1/resources/kafka.properties +++ b/servers/0.11.0.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/0.11.0.1/resources/kafka_server_jaas.conf b/servers/0.11.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.11.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/0.11.0.2/resources/kafka.properties +++ b/servers/0.11.0.2/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/0.11.0.2/resources/kafka_server_jaas.conf b/servers/0.11.0.2/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.11.0.2/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.11.0.3/resources/kafka.properties b/servers/0.11.0.3/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/0.11.0.3/resources/kafka.properties +++ b/servers/0.11.0.3/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/0.11.0.3/resources/kafka_server_jaas.conf b/servers/0.11.0.3/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/0.11.0.3/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties index b4c4088..fb859dd 100644 --- a/servers/0.9.0.0/resources/kafka.properties +++ b/servers/0.9.0.0/resources/kafka.properties @@ -121,7 +121,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties index 7d8e2b1..28668db 100644 --- a/servers/0.9.0.1/resources/kafka.properties +++ b/servers/0.9.0.1/resources/kafka.properties @@ -121,7 +121,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/1.0.0/resources/kafka.properties b/servers/1.0.0/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/1.0.0/resources/kafka.properties +++ b/servers/1.0.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/1.0.0/resources/kafka_server_jaas.conf b/servers/1.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/1.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/1.0.1/resources/kafka.properties b/servers/1.0.1/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/1.0.1/resources/kafka.properties +++ b/servers/1.0.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/1.0.1/resources/kafka_server_jaas.conf b/servers/1.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/1.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/1.0.2/resources/kafka.properties b/servers/1.0.2/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/1.0.2/resources/kafka.properties +++ b/servers/1.0.2/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/1.0.2/resources/kafka_server_jaas.conf b/servers/1.0.2/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/1.0.2/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/1.1.0/resources/kafka.properties b/servers/1.1.0/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/1.1.0/resources/kafka.properties +++ b/servers/1.1.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/1.1.0/resources/kafka_server_jaas.conf b/servers/1.1.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/1.1.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties index fe6a89f..5775cfd 100644 --- a/servers/1.1.1/resources/kafka.properties +++ b/servers/1.1.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar @@ -33,10 +35,6 @@ ssl.truststore.password=foobar authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true -# List of enabled mechanisms, can be more than one -sasl.enabled.mechanisms=PLAIN -sasl.mechanism.inter.broker.protocol=PLAIN - # The port the socket server listens on #port=9092 diff --git a/servers/1.1.1/resources/kafka_server_jaas.conf b/servers/1.1.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/1.1.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/2.0.0/resources/kafka.properties b/servers/2.0.0/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/2.0.0/resources/kafka.properties +++ b/servers/2.0.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/2.0.0/resources/kafka_server_jaas.conf b/servers/2.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/2.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/2.0.1/resources/kafka.properties b/servers/2.0.1/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/2.0.1/resources/kafka.properties +++ b/servers/2.0.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/2.0.1/resources/kafka_server_jaas.conf b/servers/2.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/2.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/2.1.0/resources/kafka.properties b/servers/2.1.0/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/2.1.0/resources/kafka.properties +++ b/servers/2.1.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/2.1.0/resources/kafka_server_jaas.conf b/servers/2.1.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/2.1.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/2.1.1/resources/kafka.properties b/servers/2.1.1/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/2.1.1/resources/kafka.properties +++ b/servers/2.1.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/2.1.1/resources/kafka_server_jaas.conf b/servers/2.1.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/2.1.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.2.0/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/2.2.0/resources/kafka.properties +++ b/servers/2.2.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/2.2.0/resources/kafka_server_jaas.conf b/servers/2.2.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/2.2.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/2.2.1/resources/kafka.properties b/servers/2.2.1/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/2.2.1/resources/kafka.properties +++ b/servers/2.2.1/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/2.2.1/resources/kafka_server_jaas.conf b/servers/2.2.1/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/2.2.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/2.3.0/resources/kafka.properties b/servers/2.3.0/resources/kafka.properties index 630dbc5..5775cfd 100644 --- a/servers/2.3.0/resources/kafka.properties +++ b/servers/2.3.0/resources/kafka.properties @@ -24,6 +24,8 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} + ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar ssl.key.password=foobar diff --git a/servers/2.3.0/resources/kafka_server_jaas.conf b/servers/2.3.0/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/2.3.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/servers/trunk/resources/kafka_server_jaas.conf b/servers/trunk/resources/kafka_server_jaas.conf new file mode 100644 index 0000000..18efe43 --- /dev/null +++ b/servers/trunk/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}};
\ No newline at end of file diff --git a/test/__init__.py b/test/__init__.py index 71f667d..329277d 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -2,14 +2,7 @@ from __future__ import absolute_import # Set default logging handler to avoid "No handler found" warnings. import logging -try: # Python 2.7+ - from logging import NullHandler -except ImportError: - class NullHandler(logging.Handler): - def emit(self, record): - pass - -logging.getLogger(__name__).addHandler(NullHandler()) +logging.basicConfig(level=logging.INFO) from kafka.future import Future Future.error_on_callbacks = True # always fail during testing diff --git a/test/fixtures.py b/test/fixtures.py index 557fca6..78cdc5c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -14,6 +14,7 @@ from kafka.vendor.six.moves import urllib, range from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer +from kafka.errors import InvalidReplicationFactorError from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest from test.testutil import env_kafka_version, random_string @@ -140,6 +141,16 @@ class Fixture(object): dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + log.debug("Template string:") + for line in template.splitlines(): + log.debug(' ' + line.strip()) + log.debug("Rendered template:") + with open(target_file.strpath, 'r') as o: + for line in o: + log.debug(' ' + line.strip()) + log.debug("binding:") + for key, value in binding.items(): + log.debug(" {key}={value}".format(key=key, value=value)) def dump_logs(self): self.child.dump_logs() @@ -233,11 +244,14 @@ class ZookeeperFixture(Fixture): class KafkaFixture(Fixture): + broker_user = 'alice' + broker_password = 'alice-secret' + @classmethod def instance(cls, broker_id, zookeeper, zk_chroot=None, host=None, port=None, transport='PLAINTEXT', replicas=1, partitions=2, - sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None): + sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") @@ -261,7 +275,7 @@ class KafkaFixture(Fixture): def __init__(self, host, port, broker_id, zookeeper, zk_chroot, replicas=1, partitions=2, transport='PLAINTEXT', - sasl_mechanism='PLAIN', auto_create_topic=True, + sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): super(KafkaFixture, self).__init__() @@ -271,13 +285,18 @@ class KafkaFixture(Fixture): self.broker_id = broker_id self.auto_create_topic = auto_create_topic self.transport = transport.upper() - self.sasl_mechanism = sasl_mechanism.upper() + if sasl_mechanism is not None: + self.sasl_mechanism = sasl_mechanism.upper() + else: + self.sasl_mechanism = None self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs # until then, we need the pattern to work across all supported broker versions # The logging format changed slightly in 1.0.0 self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) + # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism + self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user) self.zookeeper = zookeeper self.zk_chroot = zk_chroot @@ -292,6 +311,64 @@ class KafkaFixture(Fixture): self.running = False self._client = None + self.sasl_config = '' + self.jaas_config = '' + + def _sasl_config(self): + if not self.sasl_enabled: + return '' + + sasl_config = "sasl.enabled.mechanisms={mechanism}\n" + sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n" + return sasl_config.format(mechanism=self.sasl_mechanism) + + def _jaas_config(self): + if not self.sasl_enabled: + return '' + + elif self.sasl_mechanism == 'PLAIN': + jaas_config = ( + "org.apache.kafka.common.security.plain.PlainLoginModule required\n" + ' username="{user}" password="{password}" user_{user}="{password}";\n' + ) + elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"): + jaas_config = ( + "org.apache.kafka.common.security.scram.ScramLoginModule required\n" + ' username="{user}" password="{password}";\n' + ) + else: + raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) + return jaas_config.format(user=self.broker_user, password=self.broker_password) + + def _add_scram_user(self): + self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user)) + args = self.kafka_run_class_args( + "kafka.admin.ConfigCommand", + "--zookeeper", + "%s:%d/%s" % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + "--alter", + "--entity-type", "users", + "--entity-name", self.broker_user, + "--add-config", + "{}=[password={}]".format(self.sasl_mechanism, self.broker_password), + ) + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stdout, stderr = proc.communicate() + + if proc.returncode != 0: + self.out("Failed to save credentials to zookeeper!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to save credentials to zookeeper!") + self.out("User created.") + + @property + def sasl_enabled(self): + return self.sasl_mechanism is not None def bootstrap_server(self): return '%s:%d' % (self.host, self.port) @@ -328,9 +405,17 @@ class KafkaFixture(Fixture): def start(self): # Configure Kafka child process properties = self.tmp_dir.join("kafka.properties") - template = self.test_resource("kafka.properties") + jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") + properties_template = self.test_resource("kafka.properties") + jaas_conf_template = self.test_resource("kafka_server_jaas.conf") + args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() + if self.sasl_enabled: + opts = env.get('KAFKA_OPTS', '').strip() + opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath) + env['KAFKA_OPTS'] = opts + self.render_template(jaas_conf_template, jaas_conf, vars(self)) timeout = 5 max_timeout = 120 @@ -345,14 +430,17 @@ class KafkaFixture(Fixture): if auto_port: self.port = get_open_port() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) - self.render_template(template, properties, vars(self)) + self.render_template(properties_template, properties, vars(self)) + self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) - if self.child.wait_for(self.start_pattern, timeout=timeout): + if self._broker_ready(timeout) and self._scram_user_present(timeout): break + self.child.dump_logs() self.child.stop() + timeout *= 2 time.sleep(backoff) tries += 1 @@ -360,11 +448,20 @@ class KafkaFixture(Fixture): else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') - (self._client,) = self.get_clients(1, '_internal_client') + (self._client,) = self.get_clients(1, client_id='_internal_client') self.out("Done!") self.running = True + def _broker_ready(self, timeout): + return self.child.wait_for(self.start_pattern, timeout=timeout) + + def _scram_user_present(self, timeout): + # no need to wait for scram user if scram is not used + if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'): + return True + return self.child.wait_for(self.scram_pattern, timeout=timeout) + def open(self): if self.running: self.out("Instance already running") @@ -378,18 +475,24 @@ class KafkaFixture(Fixture): self.tmp_dir.ensure('data', dir=True) self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" transport = %s", self.transport) - log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zookeeper.host) - log.info(" zk_port = %s", self.zookeeper.port) - log.info(" zk_chroot = %s", self.zk_chroot) - log.info(" replicas = %s", self.replicas) - log.info(" partitions = %s", self.partitions) - log.info(" tmp_dir = %s", self.tmp_dir.strpath) + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" transport = %s", self.transport) + log.info(" sasl_mechanism = %s", self.sasl_mechanism) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zookeeper.host) + log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) self._create_zk_chroot() + self.sasl_config = self._sasl_config() + self.jaas_config = self._jaas_config() + # add user to zookeeper for the first server + if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: + self._add_scram_user() self.start() atexit.register(self.close) @@ -437,7 +540,8 @@ class KafkaFixture(Fixture): future = self._client.send(node_id, request) future.error_on_callbacks = True future.add_errback(_failure) - return self._client.poll(future=future, timeout_ms=timeout) + self._client.poll(future=future, timeout_ms=timeout) + return future.value except Exception as exc: time.sleep(1) retries -= 1 @@ -446,80 +550,122 @@ class KafkaFixture(Fixture): else: pass # retry - def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000): if num_partitions is None: num_partitions = self.partitions if replication_factor is None: replication_factor = self.replicas # Try different methods to create a topic, from the fastest to the slowest - if self.auto_create_topic and \ - num_partitions == self.partitions and \ - replication_factor == self.replicas: - self._send_request(MetadataRequest[0]([topic_name])) + if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas: + self._create_topic_via_metadata(topic_name, timeout_ms) elif env_kafka_version() >= (0, 10, 1, 0): - request = CreateTopicsRequest[0]([(topic_name, num_partitions, - replication_factor, [], [])], timeout_ms) - result = self._send_request(request, timeout=timeout_ms) - for topic_result in result[0].topic_error_codes: - error_code = topic_result[1] - if error_code != 0: - raise errors.for_code(error_code) + try: + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) + except InvalidReplicationFactorError: + # wait and try again + # on travis the brokers sometimes take a while to find themselves + time.sleep(0.5) + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) else: - args = self.kafka_run_class_args('kafka.admin.TopicCommand', - '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - '--create', - '--topic', topic_name, - '--partitions', self.partitions \ - if num_partitions is None else num_partitions, - '--replication-factor', self.replicas \ - if replication_factor is None \ - else replication_factor) - if env_kafka_version() >= (0, 10): - args.append('--if-not-exists') - env = self.kafka_run_class_env() - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - if 'kafka.common.TopicExistsException' not in stdout: - self.out("Failed to create topic %s" % (topic_name,)) - self.out(stdout) - self.out(stderr) - raise RuntimeError("Failed to create topic %s" % (topic_name,)) + self._create_topic_via_cli(topic_name, num_partitions, replication_factor) + + def _create_topic_via_metadata(self, topic_name, timeout_ms=10000): + self._send_request(MetadataRequest[0]([topic_name]), timeout_ms) + + def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + request = CreateTopicsRequest[0]([(topic_name, num_partitions, + replication_factor, [], [])], timeout_ms) + response = self._send_request(request, timeout=timeout_ms) + for topic_result in response.topic_errors: + error_code = topic_result[1] + if error_code != 0: + raise errors.for_code(error_code) + + def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor) + if env_kafka_version() >= (0, 10): + args.append('--if-not-exists') + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + if 'kafka.common.TopicExistsException' not in stdout: + self.out("Failed to create topic %s" % (topic_name,)) + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to create topic %s" % (topic_name,)) + + def get_topic_names(self): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--list' + ) + env = self.kafka_run_class_env() + env.pop('KAFKA_LOG4J_OPTS') + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + self.out("Failed to list topics!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to list topics!") + return stdout.decode().splitlines(False) def create_topics(self, topic_names, num_partitions=None, replication_factor=None): for topic_name in topic_names: self._create_topic(topic_name, num_partitions, replication_factor) - def get_clients(self, cnt=1, client_id=None): - if client_id is None: - client_id = 'client' - return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)), - bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) - - def get_admin_clients(self, cnt=1, **params): - params.setdefault('client_id', 'admin_client') - params['bootstrap_servers'] = self.bootstrap_server() + def _enrich_client_params(self, params, **defaults): + params = params.copy() + for key, value in defaults.items(): + params.setdefault(key, value) + params.setdefault('bootstrap_servers', self.bootstrap_server()) + if self.sasl_enabled: + params.setdefault('sasl_mechanism', self.sasl_mechanism) + params.setdefault('security_protocol', self.transport) + if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): + params.setdefault('sasl_plain_username', self.broker_user) + params.setdefault('sasl_plain_password', self.broker_password) + return params + + @staticmethod + def _create_many_clients(cnt, cls, *args, **params): client_id = params['client_id'] - for x in range(cnt): + for _ in range(cnt): params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaAdminClient(**params) + yield cls(*args, **params) + + def get_clients(self, cnt=1, **params): + params = self._enrich_client_params(params, client_id='client') + for client in self._create_many_clients(cnt, KafkaClient, **params): + yield client + + def get_admin_clients(self, cnt, **params): + params = self._enrich_client_params(params, client_id='admin_client') + for client in self._create_many_clients(cnt, KafkaAdminClient, **params): + yield client def get_consumers(self, cnt, topics, **params): - params.setdefault('client_id', 'consumer') - params.setdefault('heartbeat_interval_ms', 500) - params['bootstrap_servers'] = self.bootstrap_server() - client_id = params['client_id'] - for x in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaConsumer(*topics, **params) + params = self._enrich_client_params( + params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest' + ) + for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params): + yield client def get_producers(self, cnt, **params): - params.setdefault('client_id', 'producer') - params['bootstrap_servers'] = self.bootstrap_server() - client_id = params['client_id'] - for x in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaProducer(**params) + params = self._enrich_client_params(params, client_id='producer') + for client in self._create_many_clients(cnt, KafkaProducer, **params): + yield client diff --git a/test/service.py b/test/service.py index 47fb846..045d780 100644 --- a/test/service.py +++ b/test/service.py @@ -45,6 +45,11 @@ class SpawnedService(threading.Thread): self.child = None self.alive = False self.daemon = True + log.info("Created service for command:") + log.info(" "+' '.join(self.args)) + log.debug("With environment:") + for key, value in self.env.items(): + log.debug(" {key}={value}".format(key=key, value=value)) def _spawn(self): if self.alive: return @@ -57,7 +62,7 @@ class SpawnedService(threading.Thread): bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.alive = True + self.alive = self.child.poll() is None def _despawn(self): if self.child.poll() is None: @@ -83,12 +88,14 @@ class SpawnedService(threading.Thread): raise if self.child.stdout in rds: - line = self.child.stdout.readline() - self.captured_stdout.append(line.decode('utf-8').rstrip()) + line = self.child.stdout.readline().decode('utf-8').rstrip() + if line: + self.captured_stdout.append(line) if self.child.stderr in rds: - line = self.child.stderr.readline() - self.captured_stderr.append(line.decode('utf-8').rstrip()) + line = self.child.stderr.readline().decode('utf-8').rstrip() + if line: + self.captured_stderr.append(line) if self.child.poll() is not None: self.dump_logs() @@ -105,6 +112,9 @@ class SpawnedService(threading.Thread): def wait_for(self, pattern, timeout=30): start = time.time() while True: + if not self.is_alive(): + raise RuntimeError("Child thread died already.") + elapsed = time.time() - start if elapsed >= timeout: log.error("Waiting for %r timed out after %d seconds", pattern, timeout) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py new file mode 100644 index 0000000..e3a4813 --- /dev/null +++ b/test/test_sasl_integration.py @@ -0,0 +1,80 @@ +import logging +import uuid + +import pytest + +from kafka.admin import NewTopic +from kafka.protocol.metadata import MetadataRequest_v1 +from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore + + +@pytest.fixture( + params=[ + pytest.param( + "PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10") + ), + pytest.param( + "SCRAM-SHA-256", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + pytest.param( + "SCRAM-SHA-512", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + ] +) +def sasl_kafka(request, kafka_broker_factory): + sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0] + yield sasl_kafka + sasl_kafka.child.dump_logs() + + +def test_admin(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + admin, = sasl_kafka.get_admin_clients(1) + admin.create_topics([NewTopic(topic_name, 1, 1)]) + assert topic_name in sasl_kafka.get_topic_names() + + +def test_produce_and_consume(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=2) + producer, = sasl_kafka.get_producers(1) + + messages_and_futures = [] # [(message, produce_future),] + for i in range(100): + encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8") + future = producer.send(topic_name, value=encoded_msg, partition=i % 2) + messages_and_futures.append((encoded_msg, future)) + producer.flush() + + for (msg, f) in messages_and_futures: + assert f.succeeded() + + consumer, = sasl_kafka.get_consumers(1, [topic_name]) + messages = {0: [], 1: []} + for i, message in enumerate(consumer, 1): + logging.debug("Consumed message %s", repr(message)) + messages[message.partition].append(message) + if i >= 100: + break + + assert_message_count(messages[0], 50) + assert_message_count(messages[1], 50) + + +def test_client(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=1) + + client, = sasl_kafka.get_clients(1) + request = MetadataRequest_v1(None) + client.send(0, request) + for _ in range(10): + result = client.poll(timeout_ms=10000) + if len(result) > 0: + break + else: + raise RuntimeError("Couldn't fetch topic response from Broker.") + result = result[0] + assert topic_name in [t[1] for t in result.topics] diff --git a/test/testutil.py b/test/testutil.py index 77a6673..ec4d70b 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -2,10 +2,15 @@ from __future__ import absolute_import import os import random +import re import string import time +def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')): + return _matcher.sub('_', string) + + def random_string(length): return "".join(random.choice(string.ascii_letters) for i in range(length)) |