summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSwen Wenzel <5111028+swenzel@users.noreply.github.com>2019-12-30 00:12:30 +0100
committerDana Powers <dana.powers@gmail.com>2019-12-29 15:12:30 -0800
commitee1c4a42ef3c7f0aa7c98f0c48b6ab0ae76d77da (patch)
treee536d4854bf88b0d24b7b2a2f5ee3d731eda9716
parent31f846c782b9dc6f2107340d269a7558e99bdfe2 (diff)
downloadkafka-python-ee1c4a42ef3c7f0aa7c98f0c48b6ab0ae76d77da.tar.gz
Enable SCRAM-SHA-256 and SCRAM-SHA-512 for sasl (#1918)
-rw-r--r--.travis.yml2
-rw-r--r--kafka/admin/client.py10
-rw-r--r--kafka/client_async.py15
-rw-r--r--kafka/conn.py147
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/producer/kafka.py10
-rw-r--r--requirements-dev.txt1
-rw-r--r--servers/0.10.0.0/resources/kafka.properties4
-rw-r--r--servers/0.10.0.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.10.0.1/resources/kafka.properties4
-rw-r--r--servers/0.10.0.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.10.1.1/resources/kafka.properties4
-rw-r--r--servers/0.10.1.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.10.2.1/resources/kafka.properties4
-rw-r--r--servers/0.10.2.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.10.2.2/resources/kafka.properties4
-rw-r--r--servers/0.10.2.2/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.11.0.0/resources/kafka.properties2
-rw-r--r--servers/0.11.0.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.11.0.1/resources/kafka.properties2
-rw-r--r--servers/0.11.0.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.11.0.2/resources/kafka.properties2
-rw-r--r--servers/0.11.0.2/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.11.0.3/resources/kafka.properties2
-rw-r--r--servers/0.11.0.3/resources/kafka_server_jaas.conf4
-rw-r--r--servers/0.9.0.0/resources/kafka.properties2
-rw-r--r--servers/0.9.0.1/resources/kafka.properties2
-rw-r--r--servers/1.0.0/resources/kafka.properties2
-rw-r--r--servers/1.0.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/1.0.1/resources/kafka.properties2
-rw-r--r--servers/1.0.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/1.0.2/resources/kafka.properties2
-rw-r--r--servers/1.0.2/resources/kafka_server_jaas.conf4
-rw-r--r--servers/1.1.0/resources/kafka.properties2
-rw-r--r--servers/1.1.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/1.1.1/resources/kafka.properties6
-rw-r--r--servers/1.1.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/2.0.0/resources/kafka.properties2
-rw-r--r--servers/2.0.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/2.0.1/resources/kafka.properties2
-rw-r--r--servers/2.0.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/2.1.0/resources/kafka.properties2
-rw-r--r--servers/2.1.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/2.1.1/resources/kafka.properties2
-rw-r--r--servers/2.1.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/2.2.0/resources/kafka.properties2
-rw-r--r--servers/2.2.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/2.2.1/resources/kafka.properties2
-rw-r--r--servers/2.2.1/resources/kafka_server_jaas.conf4
-rw-r--r--servers/2.3.0/resources/kafka.properties2
-rw-r--r--servers/2.3.0/resources/kafka_server_jaas.conf4
-rw-r--r--servers/trunk/resources/kafka_server_jaas.conf4
-rw-r--r--test/__init__.py9
-rw-r--r--test/fixtures.py298
-rw-r--r--test/service.py20
-rw-r--r--test/test_sasl_integration.py80
-rw-r--r--test/testutil.py5
57 files changed, 619 insertions, 136 deletions
diff --git a/.travis.yml b/.travis.yml
index 4023972..a245650 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -25,7 +25,7 @@ addons:
cache:
directories:
- $HOME/.cache/pip
- - servers/
+ - servers/dist
before_install:
- source travis_java_install.sh
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 4e4e842..8afe95b 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -131,11 +131,11 @@ class KafkaAdminClient(object):
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
- PLAIN, GSSAPI, OAUTHBEARER.
- sasl_plain_username (str): username for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
- sasl_plain_password (str): password for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
+ PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
+ sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
+ sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 4630b90..5379153 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -144,11 +144,11 @@ class KafkaClient(object):
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
- PLAIN, GSSAPI, OAUTHBEARER.
- sasl_plain_username (str): username for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
- sasl_plain_password (str): password for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
+ PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
+ sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
+ sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -768,10 +768,7 @@ class KafkaClient(object):
inflight = curr_inflight
found = node_id
- if found is not None:
- return found
-
- return None
+ return found
def set_topics(self, topics):
"""Set specific topics to track for metadata.
diff --git a/kafka/conn.py b/kafka/conn.py
index d4c5464..e4938c7 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,12 +1,16 @@
from __future__ import absolute_import, division
-import collections
+import base64
import copy
import errno
+import hashlib
+import hmac
import io
import logging
from random import shuffle, uniform
+from uuid import uuid4
+
# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
@@ -16,7 +20,6 @@ except ImportError:
import socket
import struct
-import sys
import threading
import time
@@ -39,6 +42,12 @@ if six.PY2:
TimeoutError = socket.error
BlockingIOError = Exception
+ def xor_bytes(left, right):
+ return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right))
+else:
+ def xor_bytes(left, right):
+ return bytes(lb ^ rb for lb, rb in zip(left, right))
+
log = logging.getLogger(__name__)
DEFAULT_KAFKA_PORT = 9092
@@ -98,6 +107,69 @@ class ConnectionStates(object):
AUTHENTICATING = '<authenticating>'
+class ScramClient:
+ MECHANISMS = {
+ 'SCRAM-SHA-256': hashlib.sha256,
+ 'SCRAM-SHA-512': hashlib.sha512
+ }
+
+ def __init__(self, user, password, mechanism):
+ self.nonce = str(uuid4()).replace('-', '')
+ self.auth_message = ''
+ self.salted_password = None
+ self.user = user
+ self.password = password.encode()
+ self.hashfunc = self.MECHANISMS[mechanism]
+ self.hashname = ''.join(mechanism.lower().split('-')[1:3])
+ self.stored_key = None
+ self.client_key = None
+ self.client_signature = None
+ self.client_proof = None
+ self.server_key = None
+ self.server_signature = None
+
+ def first_message(self):
+ client_first_bare = 'n={},r={}'.format(self.user, self.nonce)
+ self.auth_message += client_first_bare
+ return 'n,,' + client_first_bare
+
+ def process_server_first_message(self, server_first_message):
+ self.auth_message += ',' + server_first_message
+ params = dict(pair.split('=', 1) for pair in server_first_message.split(','))
+ server_nonce = params['r']
+ if not server_nonce.startswith(self.nonce):
+ raise ValueError("Server nonce, did not start with client nonce!")
+ self.nonce = server_nonce
+ self.auth_message += ',c=biws,r=' + self.nonce
+
+ salt = base64.b64decode(params['s'].encode())
+ iterations = int(params['i'])
+ self.create_salted_password(salt, iterations)
+
+ self.client_key = self.hmac(self.salted_password, b'Client Key')
+ self.stored_key = self.hashfunc(self.client_key).digest()
+ self.client_signature = self.hmac(self.stored_key, self.auth_message.encode())
+ self.client_proof = xor_bytes(self.client_key, self.client_signature)
+ self.server_key = self.hmac(self.salted_password, b'Server Key')
+ self.server_signature = self.hmac(self.server_key, self.auth_message.encode())
+
+ def hmac(self, key, msg):
+ return hmac.new(key, msg, digestmod=self.hashfunc).digest()
+
+ def create_salted_password(self, salt, iterations):
+ self.salted_password = hashlib.pbkdf2_hmac(
+ self.hashname, self.password, salt, iterations
+ )
+
+ def final_message(self):
+ client_final_no_proof = 'c=biws,r=' + self.nonce
+ return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode())
+
+ def process_server_final_message(self, server_final_message):
+ params = dict(pair.split('=', 1) for pair in server_final_message.split(','))
+ if self.server_signature != base64.b64decode(params['v'].encode()):
+ raise ValueError("Server sent wrong signature!")
+
class BrokerConnection(object):
"""Initialize a Kafka broker connection
@@ -178,11 +250,11 @@ class BrokerConnection(object):
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
- PLAIN, GSSAPI, OAUTHBEARER.
- sasl_plain_username (str): username for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
- sasl_plain_password (str): password for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
+ PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
+ sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
+ sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
@@ -225,7 +297,7 @@ class BrokerConnection(object):
'sasl_oauth_token_provider': None
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
- SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER')
+ SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512")
def __init__(self, host, port, afi, **configs):
self.host = host
@@ -260,9 +332,13 @@ class BrokerConnection(object):
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
- if self.config['sasl_mechanism'] == 'PLAIN':
- assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
- assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
+ if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
+ assert self.config['sasl_plain_username'] is not None, (
+ 'sasl_plain_username required for PLAIN or SCRAM sasl'
+ )
+ assert self.config['sasl_plain_password'] is not None, (
+ 'sasl_plain_password required for PLAIN or SCRAM sasl'
+ )
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
@@ -553,6 +629,8 @@ class BrokerConnection(object):
return self._try_authenticate_gssapi(future)
elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
return self._try_authenticate_oauth(future)
+ elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"):
+ return self._try_authenticate_scram(future)
else:
return future.failure(
Errors.UnsupportedSaslMechanismError(
@@ -653,6 +731,53 @@ class BrokerConnection(object):
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)
+ def _try_authenticate_scram(self, future):
+ if self.config['security_protocol'] == 'SASL_PLAINTEXT':
+ log.warning('%s: Exchanging credentials in the clear', self)
+
+ scram_client = ScramClient(
+ self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism']
+ )
+
+ err = None
+ close = False
+ with self._lock:
+ if not self._can_send_recv():
+ err = Errors.NodeNotReadyError(str(self))
+ close = False
+ else:
+ try:
+ client_first = scram_client.first_message().encode()
+ size = Int32.encode(len(client_first))
+ self._send_bytes_blocking(size + client_first)
+
+ (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
+ server_first = self._recv_bytes_blocking(data_len).decode()
+ scram_client.process_server_first_message(server_first)
+
+ client_final = scram_client.final_message().encode()
+ size = Int32.encode(len(client_final))
+ self._send_bytes_blocking(size + client_final)
+
+ (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
+ server_final = self._recv_bytes_blocking(data_len).decode()
+ scram_client.process_server_final_message(server_final)
+
+ except (ConnectionError, TimeoutError) as e:
+ log.exception("%s: Error receiving reply from server", self)
+ err = Errors.KafkaConnectionError("%s: %s" % (self, e))
+ close = True
+
+ if err is not None:
+ if close:
+ self.close(error=err)
+ return future.failure(err)
+
+ log.info(
+ '%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism']
+ )
+ return future.success(True)
+
def _try_authenticate_gssapi(self, future):
kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index cde956c..8474b7c 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -232,11 +232,11 @@ class KafkaConsumer(six.Iterator):
subscribing to it. Requires 0.10+ Default: True
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
- PLAIN, GSSAPI, OAUTHBEARER.
- sasl_plain_username (str): Username for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
- sasl_plain_password (str): Password for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
+ PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
+ sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
+ sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index dc383d6..b90ca88 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -269,11 +269,11 @@ class KafkaProducer(object):
Default: selectors.DefaultSelector
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
- PLAIN, GSSAPI, OAUTHBEARER.
- sasl_plain_username (str): username for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
- sasl_plain_password (str): password for sasl PLAIN authentication.
- Required if sasl_mechanism is PLAIN.
+ PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
+ sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
+ sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
+ Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
diff --git a/requirements-dev.txt b/requirements-dev.txt
index cb0bbe5..d283090 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -8,6 +8,7 @@ lz4==2.1.2
xxhash==1.3.0
python-snappy==0.5.3
tox==3.5.3
+mock==3.0.5
pylint==1.9.3
pytest-pylint==0.12.3
pytest-mock==1.10.0
diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties
index 7d8e2b1..daab312 100644
--- a/servers/0.10.0.0/resources/kafka.properties
+++ b/servers/0.10.0.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
-offsets.topic.replication.factor=2
+offsets.topic.replication.factor=1
# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
diff --git a/servers/0.10.0.0/resources/kafka_server_jaas.conf b/servers/0.10.0.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.10.0.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties
index 7d8e2b1..daab312 100644
--- a/servers/0.10.0.1/resources/kafka.properties
+++ b/servers/0.10.0.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
-offsets.topic.replication.factor=2
+offsets.topic.replication.factor=1
# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
diff --git a/servers/0.10.0.1/resources/kafka_server_jaas.conf b/servers/0.10.0.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.10.0.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties
index 7d8e2b1..daab312 100644
--- a/servers/0.10.1.1/resources/kafka.properties
+++ b/servers/0.10.1.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
-offsets.topic.replication.factor=2
+offsets.topic.replication.factor=1
# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
diff --git a/servers/0.10.1.1/resources/kafka_server_jaas.conf b/servers/0.10.1.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.10.1.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties
index 7d8e2b1..daab312 100644
--- a/servers/0.10.2.1/resources/kafka.properties
+++ b/servers/0.10.2.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
-offsets.topic.replication.factor=2
+offsets.topic.replication.factor=1
# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
diff --git a/servers/0.10.2.1/resources/kafka_server_jaas.conf b/servers/0.10.2.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.10.2.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.10.2.2/resources/kafka.properties b/servers/0.10.2.2/resources/kafka.properties
index 7d8e2b1..daab312 100644
--- a/servers/0.10.2.2/resources/kafka.properties
+++ b/servers/0.10.2.2/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
@@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
-offsets.topic.replication.factor=2
+offsets.topic.replication.factor=1
# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
diff --git a/servers/0.10.2.2/resources/kafka_server_jaas.conf b/servers/0.10.2.2/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.10.2.2/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/0.11.0.0/resources/kafka.properties
+++ b/servers/0.11.0.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/0.11.0.0/resources/kafka_server_jaas.conf b/servers/0.11.0.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.11.0.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/0.11.0.1/resources/kafka.properties
+++ b/servers/0.11.0.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/0.11.0.1/resources/kafka_server_jaas.conf b/servers/0.11.0.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.11.0.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/0.11.0.2/resources/kafka.properties
+++ b/servers/0.11.0.2/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/0.11.0.2/resources/kafka_server_jaas.conf b/servers/0.11.0.2/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.11.0.2/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.11.0.3/resources/kafka.properties b/servers/0.11.0.3/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/0.11.0.3/resources/kafka.properties
+++ b/servers/0.11.0.3/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/0.11.0.3/resources/kafka_server_jaas.conf b/servers/0.11.0.3/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/0.11.0.3/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties
index b4c4088..fb859dd 100644
--- a/servers/0.9.0.0/resources/kafka.properties
+++ b/servers/0.9.0.0/resources/kafka.properties
@@ -121,7 +121,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
-offsets.topic.replication.factor=2
+offsets.topic.replication.factor=1
# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties
index 7d8e2b1..28668db 100644
--- a/servers/0.9.0.1/resources/kafka.properties
+++ b/servers/0.9.0.1/resources/kafka.properties
@@ -121,7 +121,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
-offsets.topic.replication.factor=2
+offsets.topic.replication.factor=1
# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
diff --git a/servers/1.0.0/resources/kafka.properties b/servers/1.0.0/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/1.0.0/resources/kafka.properties
+++ b/servers/1.0.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/1.0.0/resources/kafka_server_jaas.conf b/servers/1.0.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/1.0.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/1.0.1/resources/kafka.properties b/servers/1.0.1/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/1.0.1/resources/kafka.properties
+++ b/servers/1.0.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/1.0.1/resources/kafka_server_jaas.conf b/servers/1.0.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/1.0.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/1.0.2/resources/kafka.properties b/servers/1.0.2/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/1.0.2/resources/kafka.properties
+++ b/servers/1.0.2/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/1.0.2/resources/kafka_server_jaas.conf b/servers/1.0.2/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/1.0.2/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/1.1.0/resources/kafka.properties b/servers/1.1.0/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/1.1.0/resources/kafka.properties
+++ b/servers/1.1.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/1.1.0/resources/kafka_server_jaas.conf b/servers/1.1.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/1.1.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties
index fe6a89f..5775cfd 100644
--- a/servers/1.1.1/resources/kafka.properties
+++ b/servers/1.1.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
@@ -33,10 +35,6 @@ ssl.truststore.password=foobar
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
-# List of enabled mechanisms, can be more than one
-sasl.enabled.mechanisms=PLAIN
-sasl.mechanism.inter.broker.protocol=PLAIN
-
# The port the socket server listens on
#port=9092
diff --git a/servers/1.1.1/resources/kafka_server_jaas.conf b/servers/1.1.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/1.1.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/2.0.0/resources/kafka.properties b/servers/2.0.0/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/2.0.0/resources/kafka.properties
+++ b/servers/2.0.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/2.0.0/resources/kafka_server_jaas.conf b/servers/2.0.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/2.0.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/2.0.1/resources/kafka.properties b/servers/2.0.1/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/2.0.1/resources/kafka.properties
+++ b/servers/2.0.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/2.0.1/resources/kafka_server_jaas.conf b/servers/2.0.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/2.0.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/2.1.0/resources/kafka.properties b/servers/2.1.0/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/2.1.0/resources/kafka.properties
+++ b/servers/2.1.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/2.1.0/resources/kafka_server_jaas.conf b/servers/2.1.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/2.1.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/2.1.1/resources/kafka.properties b/servers/2.1.1/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/2.1.1/resources/kafka.properties
+++ b/servers/2.1.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/2.1.1/resources/kafka_server_jaas.conf b/servers/2.1.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/2.1.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.2.0/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/2.2.0/resources/kafka.properties
+++ b/servers/2.2.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/2.2.0/resources/kafka_server_jaas.conf b/servers/2.2.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/2.2.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/2.2.1/resources/kafka.properties b/servers/2.2.1/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/2.2.1/resources/kafka.properties
+++ b/servers/2.2.1/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/2.2.1/resources/kafka_server_jaas.conf b/servers/2.2.1/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/2.2.1/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/2.3.0/resources/kafka.properties b/servers/2.3.0/resources/kafka.properties
index 630dbc5..5775cfd 100644
--- a/servers/2.3.0/resources/kafka.properties
+++ b/servers/2.3.0/resources/kafka.properties
@@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}
+{sasl_config}
+
ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
diff --git a/servers/2.3.0/resources/kafka_server_jaas.conf b/servers/2.3.0/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/2.3.0/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/servers/trunk/resources/kafka_server_jaas.conf b/servers/trunk/resources/kafka_server_jaas.conf
new file mode 100644
index 0000000..18efe43
--- /dev/null
+++ b/servers/trunk/resources/kafka_server_jaas.conf
@@ -0,0 +1,4 @@
+KafkaServer {{
+ {jaas_config}
+}};
+Client {{}}; \ No newline at end of file
diff --git a/test/__init__.py b/test/__init__.py
index 71f667d..329277d 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -2,14 +2,7 @@ from __future__ import absolute_import
# Set default logging handler to avoid "No handler found" warnings.
import logging
-try: # Python 2.7+
- from logging import NullHandler
-except ImportError:
- class NullHandler(logging.Handler):
- def emit(self, record):
- pass
-
-logging.getLogger(__name__).addHandler(NullHandler())
+logging.basicConfig(level=logging.INFO)
from kafka.future import Future
Future.error_on_callbacks = True # always fail during testing
diff --git a/test/fixtures.py b/test/fixtures.py
index 557fca6..78cdc5c 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -14,6 +14,7 @@ from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
+from kafka.errors import InvalidReplicationFactorError
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
from test.testutil import env_kafka_version, random_string
@@ -140,6 +141,16 @@ class Fixture(object):
dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
+ log.debug("Template string:")
+ for line in template.splitlines():
+ log.debug(' ' + line.strip())
+ log.debug("Rendered template:")
+ with open(target_file.strpath, 'r') as o:
+ for line in o:
+ log.debug(' ' + line.strip())
+ log.debug("binding:")
+ for key, value in binding.items():
+ log.debug(" {key}={value}".format(key=key, value=value))
def dump_logs(self):
self.child.dump_logs()
@@ -233,11 +244,14 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
+ broker_user = 'alice'
+ broker_password = 'alice-secret'
+
@classmethod
def instance(cls, broker_id, zookeeper, zk_chroot=None,
host=None, port=None,
transport='PLAINTEXT', replicas=1, partitions=2,
- sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None):
+ sasl_mechanism=None, auto_create_topic=True, tmp_dir=None):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
@@ -261,7 +275,7 @@ class KafkaFixture(Fixture):
def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
replicas=1, partitions=2, transport='PLAINTEXT',
- sasl_mechanism='PLAIN', auto_create_topic=True,
+ sasl_mechanism=None, auto_create_topic=True,
tmp_dir=None):
super(KafkaFixture, self).__init__()
@@ -271,13 +285,18 @@ class KafkaFixture(Fixture):
self.broker_id = broker_id
self.auto_create_topic = auto_create_topic
self.transport = transport.upper()
- self.sasl_mechanism = sasl_mechanism.upper()
+ if sasl_mechanism is not None:
+ self.sasl_mechanism = sasl_mechanism.upper()
+ else:
+ self.sasl_mechanism = None
self.ssl_dir = self.test_resource('ssl')
# TODO: checking for port connection would be better than scanning logs
# until then, we need the pattern to work across all supported broker versions
# The logging format changed slightly in 1.0.0
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,)
+ # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism
+ self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user)
self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
@@ -292,6 +311,64 @@ class KafkaFixture(Fixture):
self.running = False
self._client = None
+ self.sasl_config = ''
+ self.jaas_config = ''
+
+ def _sasl_config(self):
+ if not self.sasl_enabled:
+ return ''
+
+ sasl_config = "sasl.enabled.mechanisms={mechanism}\n"
+ sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n"
+ return sasl_config.format(mechanism=self.sasl_mechanism)
+
+ def _jaas_config(self):
+ if not self.sasl_enabled:
+ return ''
+
+ elif self.sasl_mechanism == 'PLAIN':
+ jaas_config = (
+ "org.apache.kafka.common.security.plain.PlainLoginModule required\n"
+ ' username="{user}" password="{password}" user_{user}="{password}";\n'
+ )
+ elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"):
+ jaas_config = (
+ "org.apache.kafka.common.security.scram.ScramLoginModule required\n"
+ ' username="{user}" password="{password}";\n'
+ )
+ else:
+ raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism))
+ return jaas_config.format(user=self.broker_user, password=self.broker_password)
+
+ def _add_scram_user(self):
+ self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user))
+ args = self.kafka_run_class_args(
+ "kafka.admin.ConfigCommand",
+ "--zookeeper",
+ "%s:%d/%s" % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ "--alter",
+ "--entity-type", "users",
+ "--entity-name", self.broker_user,
+ "--add-config",
+ "{}=[password={}]".format(self.sasl_mechanism, self.broker_password),
+ )
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ stdout, stderr = proc.communicate()
+
+ if proc.returncode != 0:
+ self.out("Failed to save credentials to zookeeper!")
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to save credentials to zookeeper!")
+ self.out("User created.")
+
+ @property
+ def sasl_enabled(self):
+ return self.sasl_mechanism is not None
def bootstrap_server(self):
return '%s:%d' % (self.host, self.port)
@@ -328,9 +405,17 @@ class KafkaFixture(Fixture):
def start(self):
# Configure Kafka child process
properties = self.tmp_dir.join("kafka.properties")
- template = self.test_resource("kafka.properties")
+ jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf")
+ properties_template = self.test_resource("kafka.properties")
+ jaas_conf_template = self.test_resource("kafka_server_jaas.conf")
+
args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
+ if self.sasl_enabled:
+ opts = env.get('KAFKA_OPTS', '').strip()
+ opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath)
+ env['KAFKA_OPTS'] = opts
+ self.render_template(jaas_conf_template, jaas_conf, vars(self))
timeout = 5
max_timeout = 120
@@ -345,14 +430,17 @@ class KafkaFixture(Fixture):
if auto_port:
self.port = get_open_port()
self.out('Attempting to start on port %d (try #%d)' % (self.port, tries))
- self.render_template(template, properties, vars(self))
+ self.render_template(properties_template, properties, vars(self))
+
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
- if self.child.wait_for(self.start_pattern, timeout=timeout):
+ if self._broker_ready(timeout) and self._scram_user_present(timeout):
break
+
self.child.dump_logs()
self.child.stop()
+
timeout *= 2
time.sleep(backoff)
tries += 1
@@ -360,11 +448,20 @@ class KafkaFixture(Fixture):
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
- (self._client,) = self.get_clients(1, '_internal_client')
+ (self._client,) = self.get_clients(1, client_id='_internal_client')
self.out("Done!")
self.running = True
+ def _broker_ready(self, timeout):
+ return self.child.wait_for(self.start_pattern, timeout=timeout)
+
+ def _scram_user_present(self, timeout):
+ # no need to wait for scram user if scram is not used
+ if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'):
+ return True
+ return self.child.wait_for(self.scram_pattern, timeout=timeout)
+
def open(self):
if self.running:
self.out("Instance already running")
@@ -378,18 +475,24 @@ class KafkaFixture(Fixture):
self.tmp_dir.ensure('data', dir=True)
self.out("Running local instance...")
- log.info(" host = %s", self.host)
- log.info(" port = %s", self.port or '(auto)')
- log.info(" transport = %s", self.transport)
- log.info(" broker_id = %s", self.broker_id)
- log.info(" zk_host = %s", self.zookeeper.host)
- log.info(" zk_port = %s", self.zookeeper.port)
- log.info(" zk_chroot = %s", self.zk_chroot)
- log.info(" replicas = %s", self.replicas)
- log.info(" partitions = %s", self.partitions)
- log.info(" tmp_dir = %s", self.tmp_dir.strpath)
+ log.info(" host = %s", self.host)
+ log.info(" port = %s", self.port or '(auto)')
+ log.info(" transport = %s", self.transport)
+ log.info(" sasl_mechanism = %s", self.sasl_mechanism)
+ log.info(" broker_id = %s", self.broker_id)
+ log.info(" zk_host = %s", self.zookeeper.host)
+ log.info(" zk_port = %s", self.zookeeper.port)
+ log.info(" zk_chroot = %s", self.zk_chroot)
+ log.info(" replicas = %s", self.replicas)
+ log.info(" partitions = %s", self.partitions)
+ log.info(" tmp_dir = %s", self.tmp_dir.strpath)
self._create_zk_chroot()
+ self.sasl_config = self._sasl_config()
+ self.jaas_config = self._jaas_config()
+ # add user to zookeeper for the first server
+ if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0:
+ self._add_scram_user()
self.start()
atexit.register(self.close)
@@ -437,7 +540,8 @@ class KafkaFixture(Fixture):
future = self._client.send(node_id, request)
future.error_on_callbacks = True
future.add_errback(_failure)
- return self._client.poll(future=future, timeout_ms=timeout)
+ self._client.poll(future=future, timeout_ms=timeout)
+ return future.value
except Exception as exc:
time.sleep(1)
retries -= 1
@@ -446,80 +550,122 @@ class KafkaFixture(Fixture):
else:
pass # retry
- def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000):
if num_partitions is None:
num_partitions = self.partitions
if replication_factor is None:
replication_factor = self.replicas
# Try different methods to create a topic, from the fastest to the slowest
- if self.auto_create_topic and \
- num_partitions == self.partitions and \
- replication_factor == self.replicas:
- self._send_request(MetadataRequest[0]([topic_name]))
+ if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas:
+ self._create_topic_via_metadata(topic_name, timeout_ms)
elif env_kafka_version() >= (0, 10, 1, 0):
- request = CreateTopicsRequest[0]([(topic_name, num_partitions,
- replication_factor, [], [])], timeout_ms)
- result = self._send_request(request, timeout=timeout_ms)
- for topic_result in result[0].topic_error_codes:
- error_code = topic_result[1]
- if error_code != 0:
- raise errors.for_code(error_code)
+ try:
+ self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
+ except InvalidReplicationFactorError:
+ # wait and try again
+ # on travis the brokers sometimes take a while to find themselves
+ time.sleep(0.5)
+ self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
else:
- args = self.kafka_run_class_args('kafka.admin.TopicCommand',
- '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
- self.zookeeper.port,
- self.zk_chroot),
- '--create',
- '--topic', topic_name,
- '--partitions', self.partitions \
- if num_partitions is None else num_partitions,
- '--replication-factor', self.replicas \
- if replication_factor is None \
- else replication_factor)
- if env_kafka_version() >= (0, 10):
- args.append('--if-not-exists')
- env = self.kafka_run_class_env()
- proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = proc.communicate()
- if proc.returncode != 0:
- if 'kafka.common.TopicExistsException' not in stdout:
- self.out("Failed to create topic %s" % (topic_name,))
- self.out(stdout)
- self.out(stderr)
- raise RuntimeError("Failed to create topic %s" % (topic_name,))
+ self._create_topic_via_cli(topic_name, num_partitions, replication_factor)
+
+ def _create_topic_via_metadata(self, topic_name, timeout_ms=10000):
+ self._send_request(MetadataRequest[0]([topic_name]), timeout_ms)
+
+ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ request = CreateTopicsRequest[0]([(topic_name, num_partitions,
+ replication_factor, [], [])], timeout_ms)
+ response = self._send_request(request, timeout=timeout_ms)
+ for topic_result in response.topic_errors:
+ error_code = topic_result[1]
+ if error_code != 0:
+ raise errors.for_code(error_code)
+
+ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--create',
+ '--topic', topic_name,
+ '--partitions', self.partitions \
+ if num_partitions is None else num_partitions,
+ '--replication-factor', self.replicas \
+ if replication_factor is None \
+ else replication_factor)
+ if env_kafka_version() >= (0, 10):
+ args.append('--if-not-exists')
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ if 'kafka.common.TopicExistsException' not in stdout:
+ self.out("Failed to create topic %s" % (topic_name,))
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to create topic %s" % (topic_name,))
+
+ def get_topic_names(self):
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--list'
+ )
+ env = self.kafka_run_class_env()
+ env.pop('KAFKA_LOG4J_OPTS')
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ self.out("Failed to list topics!")
+ self.out(stdout)
+ self.out(stderr)
+ raise RuntimeError("Failed to list topics!")
+ return stdout.decode().splitlines(False)
def create_topics(self, topic_names, num_partitions=None, replication_factor=None):
for topic_name in topic_names:
self._create_topic(topic_name, num_partitions, replication_factor)
- def get_clients(self, cnt=1, client_id=None):
- if client_id is None:
- client_id = 'client'
- return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
- bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
-
- def get_admin_clients(self, cnt=1, **params):
- params.setdefault('client_id', 'admin_client')
- params['bootstrap_servers'] = self.bootstrap_server()
+ def _enrich_client_params(self, params, **defaults):
+ params = params.copy()
+ for key, value in defaults.items():
+ params.setdefault(key, value)
+ params.setdefault('bootstrap_servers', self.bootstrap_server())
+ if self.sasl_enabled:
+ params.setdefault('sasl_mechanism', self.sasl_mechanism)
+ params.setdefault('security_protocol', self.transport)
+ if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
+ params.setdefault('sasl_plain_username', self.broker_user)
+ params.setdefault('sasl_plain_password', self.broker_password)
+ return params
+
+ @staticmethod
+ def _create_many_clients(cnt, cls, *args, **params):
client_id = params['client_id']
- for x in range(cnt):
+ for _ in range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaAdminClient(**params)
+ yield cls(*args, **params)
+
+ def get_clients(self, cnt=1, **params):
+ params = self._enrich_client_params(params, client_id='client')
+ for client in self._create_many_clients(cnt, KafkaClient, **params):
+ yield client
+
+ def get_admin_clients(self, cnt, **params):
+ params = self._enrich_client_params(params, client_id='admin_client')
+ for client in self._create_many_clients(cnt, KafkaAdminClient, **params):
+ yield client
def get_consumers(self, cnt, topics, **params):
- params.setdefault('client_id', 'consumer')
- params.setdefault('heartbeat_interval_ms', 500)
- params['bootstrap_servers'] = self.bootstrap_server()
- client_id = params['client_id']
- for x in range(cnt):
- params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaConsumer(*topics, **params)
+ params = self._enrich_client_params(
+ params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest'
+ )
+ for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params):
+ yield client
def get_producers(self, cnt, **params):
- params.setdefault('client_id', 'producer')
- params['bootstrap_servers'] = self.bootstrap_server()
- client_id = params['client_id']
- for x in range(cnt):
- params['client_id'] = '%s_%s' % (client_id, random_string(4))
- yield KafkaProducer(**params)
+ params = self._enrich_client_params(params, client_id='producer')
+ for client in self._create_many_clients(cnt, KafkaProducer, **params):
+ yield client
diff --git a/test/service.py b/test/service.py
index 47fb846..045d780 100644
--- a/test/service.py
+++ b/test/service.py
@@ -45,6 +45,11 @@ class SpawnedService(threading.Thread):
self.child = None
self.alive = False
self.daemon = True
+ log.info("Created service for command:")
+ log.info(" "+' '.join(self.args))
+ log.debug("With environment:")
+ for key, value in self.env.items():
+ log.debug(" {key}={value}".format(key=key, value=value))
def _spawn(self):
if self.alive: return
@@ -57,7 +62,7 @@ class SpawnedService(threading.Thread):
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- self.alive = True
+ self.alive = self.child.poll() is None
def _despawn(self):
if self.child.poll() is None:
@@ -83,12 +88,14 @@ class SpawnedService(threading.Thread):
raise
if self.child.stdout in rds:
- line = self.child.stdout.readline()
- self.captured_stdout.append(line.decode('utf-8').rstrip())
+ line = self.child.stdout.readline().decode('utf-8').rstrip()
+ if line:
+ self.captured_stdout.append(line)
if self.child.stderr in rds:
- line = self.child.stderr.readline()
- self.captured_stderr.append(line.decode('utf-8').rstrip())
+ line = self.child.stderr.readline().decode('utf-8').rstrip()
+ if line:
+ self.captured_stderr.append(line)
if self.child.poll() is not None:
self.dump_logs()
@@ -105,6 +112,9 @@ class SpawnedService(threading.Thread):
def wait_for(self, pattern, timeout=30):
start = time.time()
while True:
+ if not self.is_alive():
+ raise RuntimeError("Child thread died already.")
+
elapsed = time.time() - start
if elapsed >= timeout:
log.error("Waiting for %r timed out after %d seconds", pattern, timeout)
diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py
new file mode 100644
index 0000000..e3a4813
--- /dev/null
+++ b/test/test_sasl_integration.py
@@ -0,0 +1,80 @@
+import logging
+import uuid
+
+import pytest
+
+from kafka.admin import NewTopic
+from kafka.protocol.metadata import MetadataRequest_v1
+from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore
+
+
+@pytest.fixture(
+ params=[
+ pytest.param(
+ "PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10")
+ ),
+ pytest.param(
+ "SCRAM-SHA-256",
+ marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
+ ),
+ pytest.param(
+ "SCRAM-SHA-512",
+ marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
+ ),
+ ]
+)
+def sasl_kafka(request, kafka_broker_factory):
+ sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0]
+ yield sasl_kafka
+ sasl_kafka.child.dump_logs()
+
+
+def test_admin(request, sasl_kafka):
+ topic_name = special_to_underscore(request.node.name + random_string(4))
+ admin, = sasl_kafka.get_admin_clients(1)
+ admin.create_topics([NewTopic(topic_name, 1, 1)])
+ assert topic_name in sasl_kafka.get_topic_names()
+
+
+def test_produce_and_consume(request, sasl_kafka):
+ topic_name = special_to_underscore(request.node.name + random_string(4))
+ sasl_kafka.create_topics([topic_name], num_partitions=2)
+ producer, = sasl_kafka.get_producers(1)
+
+ messages_and_futures = [] # [(message, produce_future),]
+ for i in range(100):
+ encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8")
+ future = producer.send(topic_name, value=encoded_msg, partition=i % 2)
+ messages_and_futures.append((encoded_msg, future))
+ producer.flush()
+
+ for (msg, f) in messages_and_futures:
+ assert f.succeeded()
+
+ consumer, = sasl_kafka.get_consumers(1, [topic_name])
+ messages = {0: [], 1: []}
+ for i, message in enumerate(consumer, 1):
+ logging.debug("Consumed message %s", repr(message))
+ messages[message.partition].append(message)
+ if i >= 100:
+ break
+
+ assert_message_count(messages[0], 50)
+ assert_message_count(messages[1], 50)
+
+
+def test_client(request, sasl_kafka):
+ topic_name = special_to_underscore(request.node.name + random_string(4))
+ sasl_kafka.create_topics([topic_name], num_partitions=1)
+
+ client, = sasl_kafka.get_clients(1)
+ request = MetadataRequest_v1(None)
+ client.send(0, request)
+ for _ in range(10):
+ result = client.poll(timeout_ms=10000)
+ if len(result) > 0:
+ break
+ else:
+ raise RuntimeError("Couldn't fetch topic response from Broker.")
+ result = result[0]
+ assert topic_name in [t[1] for t in result.topics]
diff --git a/test/testutil.py b/test/testutil.py
index 77a6673..ec4d70b 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -2,10 +2,15 @@ from __future__ import absolute_import
import os
import random
+import re
import string
import time
+def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')):
+ return _matcher.sub('_', string)
+
+
def random_string(length):
return "".join(random.choice(string.ascii_letters) for i in range(length))