summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndre Araujo <asdaraujo@gmail.com>2017-11-15 06:08:29 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-02-21 13:30:12 -0800
commita1869c4be5f47b4f6433610249aaf29af4ec95e5 (patch)
treec18b155f5a3b812ed69a2f3a7d0499628cd87694
parent0f5d35fa3489fa36000c05a891d375cc30672e23 (diff)
downloadkafka-python-a1869c4be5f47b4f6433610249aaf29af4ec95e5.tar.gz
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration.
-rw-r--r--pylint.rc1
-rw-r--r--test/conftest.py113
-rw-r--r--test/fixtures.py299
-rw-r--r--test/test_client_integration.py2
-rw-r--r--test/test_consumer_integration.py45
-rw-r--r--test/test_failover_integration.py3
-rw-r--r--test/test_producer_integration.py64
-rw-r--r--test/testutil.py89
-rw-r--r--tox.ini1
9 files changed, 460 insertions, 157 deletions
diff --git a/pylint.rc b/pylint.rc
index d13ef51..d22e523 100644
--- a/pylint.rc
+++ b/pylint.rc
@@ -1,5 +1,6 @@
[TYPECHECK]
ignored-classes=SyncManager,_socketobject
+generated-members=py.*
[MESSAGES CONTROL]
disable=E1129
diff --git a/test/conftest.py b/test/conftest.py
index e85b977..d53ff23 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,38 +1,117 @@
from __future__ import absolute_import
-import os
+import inspect
import pytest
+from decorator import decorate
from test.fixtures import KafkaFixture, ZookeeperFixture
-
+from test.testutil import kafka_version, random_string
@pytest.fixture(scope="module")
def version():
- if 'KAFKA_VERSION' not in os.environ:
- return ()
- return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
-
+ """Return the Kafka version set in the OS environment"""
+ return kafka_version()
@pytest.fixture(scope="module")
-def zookeeper(version, request):
- assert version
- zk = ZookeeperFixture.instance()
- yield zk
- zk.close()
+def zookeeper():
+ """Return a Zookeeper fixture"""
+ zk_instance = ZookeeperFixture.instance()
+ yield zk_instance
+ zk_instance.close()
+@pytest.fixture(scope="module")
+def kafka_broker(kafka_broker_factory):
+ """Return a Kafka broker fixture"""
+ return kafka_broker_factory()[0]
@pytest.fixture(scope="module")
-def kafka_broker(version, zookeeper, request):
- assert version
- k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
- partitions=4)
- yield k
- k.close()
+def kafka_broker_factory(version, zookeeper):
+ """Return a Kafka broker fixture factory"""
+ assert version, 'KAFKA_VERSION must be specified to run integration tests'
+
+ _brokers = []
+ def factory(**broker_params):
+ params = {} if broker_params is None else broker_params.copy()
+ params.setdefault('partitions', 4)
+ num_brokers = params.pop('num_brokers', 1)
+ brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
+ for x in range(num_brokers))
+ _brokers.extend(brokers)
+ return brokers
+ yield factory
+
+ for broker in _brokers:
+ broker.close()
+
+@pytest.fixture
+def simple_client(kafka_broker, request, topic):
+ """Return a SimpleClient fixture"""
+ client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
+ client.ensure_topic_exists(topic)
+ yield client
+ client.close()
+
+@pytest.fixture
+def kafka_client(kafka_broker, request):
+ """Return a KafkaClient fixture"""
+ (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
+ yield client
+ client.close()
+
+@pytest.fixture
+def kafka_consumer(kafka_consumer_factory):
+ """Return a KafkaConsumer fixture"""
+ return kafka_consumer_factory()
+
+@pytest.fixture
+def kafka_consumer_factory(kafka_broker, topic, request):
+ """Return a KafkaConsumer factory fixture"""
+ _consumer = [None]
+
+ def factory(**kafka_consumer_params):
+ params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
+ params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
+ _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
+ return _consumer[0]
+
+ yield factory
+
+ if _consumer[0]:
+ _consumer[0].close()
+
+@pytest.fixture
+def kafka_producer(kafka_producer_factory):
+ """Return a KafkaProducer fixture"""
+ yield kafka_producer_factory()
+
+@pytest.fixture
+def kafka_producer_factory(kafka_broker, request):
+ """Return a KafkaProduce factory fixture"""
+ _producer = [None]
+
+ def factory(**kafka_producer_params):
+ params = {} if kafka_producer_params is None else kafka_producer_params.copy()
+ params.setdefault('client_id', 'producer_%s' % (request.node.name,))
+ _producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
+ return _producer[0]
+
+ yield factory
+
+ if _producer[0]:
+ _producer[0].close()
+
+@pytest.fixture
+def topic(kafka_broker, request):
+ """Return a topic fixture"""
+ topic_name = '%s_%s' % (request.node.name, random_string(10))
+ kafka_broker.create_topics([topic_name])
+ return topic_name
@pytest.fixture
def conn(mocker):
+ """Return a connection mocker fixture"""
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
diff --git a/test/fixtures.py b/test/fixtures.py
index 1c418fd..493a664 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -4,29 +4,55 @@ import atexit
import logging
import os
import os.path
-import shutil
+import random
+import socket
+import string
import subprocess
-import tempfile
import time
import uuid
-from six.moves import urllib
+import py
+from six.moves import urllib, xrange
from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
+from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
+from kafka.client_async import KafkaClient
+from kafka.protocol.admin import CreateTopicsRequest
+from kafka.protocol.metadata import MetadataRequest
from test.service import ExternalService, SpawnedService
-from test.testutil import get_open_port
-
log = logging.getLogger(__name__)
+def random_string(length):
+ return "".join(random.choice(string.ascii_letters) for i in xrange(length))
+
+def version_str_to_list(version_str):
+ return tuple(map(int, version_str.split('.'))) # e.g., (0, 8, 1, 1)
+
+def version():
+ if 'KAFKA_VERSION' not in os.environ:
+ return ()
+ return version_str_to_list(os.environ['KAFKA_VERSION'])
+
+def get_open_port():
+ sock = socket.socket()
+ sock.bind(("", 0))
+ port = sock.getsockname()[1]
+ sock.close()
+ return port
class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2')
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
- project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
- kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
+ project_root = os.environ.get('PROJECT_ROOT',
+ os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+ kafka_root = os.environ.get("KAFKA_ROOT",
+ os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
+ def __init__(self):
+ self.child = None
+
@classmethod
def download_official_distribution(cls,
kafka_version=None,
@@ -71,31 +97,34 @@ class Fixture(object):
@classmethod
def kafka_run_class_args(cls, *args):
result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
- result.extend(args)
+ result.extend([str(arg) for arg in args])
return result
def kafka_run_class_env(self):
env = os.environ.copy()
- env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties")
+ env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \
+ self.test_resource("log4j.properties")
return env
@classmethod
def render_template(cls, source_file, target_file, binding):
- log.info('Rendering %s from template %s', target_file, source_file)
+ log.info('Rendering %s from template %s', target_file.strpath, source_file)
with open(source_file, "r") as handle:
template = handle.read()
assert len(template) > 0, 'Empty template %s' % source_file
- with open(target_file, "w") as handle:
+ with open(target_file.strpath, "w") as handle:
handle.write(template.format(**binding))
handle.flush()
os.fsync(handle)
# fsync directory for durability
# https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/
- dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY)
+ dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
+ def dump_logs(self):
+ self.child.dump_logs()
class ZookeeperFixture(Fixture):
@classmethod
@@ -111,32 +140,36 @@ class ZookeeperFixture(Fixture):
fixture.open()
return fixture
- def __init__(self, host, port):
+ def __init__(self, host, port, tmp_dir=None):
+ super(ZookeeperFixture, self).__init__()
self.host = host
self.port = port
- self.tmp_dir = None
- self.child = None
+ self.tmp_dir = tmp_dir
def kafka_run_class_env(self):
env = super(ZookeeperFixture, self).kafka_run_class_env()
- env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
+ env['LOG_DIR'] = self.tmp_dir.join('logs').strpath
return env
def out(self, message):
log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message)
def open(self):
- self.tmp_dir = tempfile.mkdtemp()
+ if self.tmp_dir is None:
+ self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
+ self.tmp_dir.ensure(dir=True)
+
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port or '(auto)')
- log.info(" tmp_dir = %s", self.tmp_dir)
+ log.info(" tmp_dir = %s", self.tmp_dir.strpath)
# Configure Zookeeper child process
template = self.test_resource("zookeeper.properties")
- properties = os.path.join(self.tmp_dir, "zookeeper.properties")
- args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
+ properties = self.tmp_dir.join("zookeeper.properties")
+ args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain",
+ properties.strpath)
env = self.kafka_run_class_env()
# Party!
@@ -174,7 +207,7 @@ class ZookeeperFixture(Fixture):
self.child.stop()
self.child = None
self.out("Done!")
- shutil.rmtree(self.tmp_dir)
+ self.tmp_dir.remove()
def __del__(self):
self.close()
@@ -182,9 +215,11 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
@classmethod
- def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None,
+ def instance(cls, broker_id, zookeeper, zk_chroot=None,
host=None, port=None,
- transport='PLAINTEXT', replicas=1, partitions=2):
+ transport='PLAINTEXT', replicas=1, partitions=2,
+ sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None):
+
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -195,19 +230,29 @@ class KafkaFixture(Fixture):
if host is None:
host = "localhost"
fixture = KafkaFixture(host, port, broker_id,
- zk_host, zk_port, zk_chroot,
+ zookeeper, zk_chroot,
transport=transport,
- replicas=replicas, partitions=partitions)
+ replicas=replicas, partitions=partitions,
+ sasl_mechanism=sasl_mechanism,
+ auto_create_topic=auto_create_topic,
+ tmp_dir=tmp_dir)
+
fixture.open()
return fixture
- def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot,
- replicas=1, partitions=2, transport='PLAINTEXT'):
+ def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
+ replicas=1, partitions=2, transport='PLAINTEXT',
+ sasl_mechanism='PLAIN', auto_create_topic=True,
+ tmp_dir=None):
+ super(KafkaFixture, self).__init__()
+
self.host = host
self.port = port
self.broker_id = broker_id
+ self.auto_create_topic = auto_create_topic
self.transport = transport.upper()
+ self.sasl_mechanism = sasl_mechanism.upper()
self.ssl_dir = self.test_resource('ssl')
# TODO: checking for port connection would be better than scanning logs
@@ -215,67 +260,55 @@ class KafkaFixture(Fixture):
# The logging format changed slightly in 1.0.0
self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id
- self.zk_host = zk_host
- self.zk_port = zk_port
+ self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
+ # Add the attributes below for the template binding
+ self.zk_host = self.zookeeper.host
+ self.zk_port = self.zookeeper.port
self.replicas = replicas
self.partitions = partitions
- self.tmp_dir = None
- self.child = None
+ self.tmp_dir = tmp_dir
self.running = False
+ self._client = None
+
+ def bootstrap_server(self):
+ return '%s:%d' % (self.host, self.port)
+
def kafka_run_class_env(self):
env = super(KafkaFixture, self).kafka_run_class_env()
- env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs')
+ env['LOG_DIR'] = self.tmp_dir.join('logs').strpath
return env
def out(self, message):
log.info("*** Kafka [%s:%s]: %s", self.host, self.port or '(auto)', message)
- def open(self):
- if self.running:
- self.out("Instance already running")
- return
-
- self.tmp_dir = tempfile.mkdtemp()
- self.out("Running local instance...")
- log.info(" host = %s", self.host)
- log.info(" port = %s", self.port or '(auto)')
- log.info(" transport = %s", self.transport)
- log.info(" broker_id = %s", self.broker_id)
- log.info(" zk_host = %s", self.zk_host)
- log.info(" zk_port = %s", self.zk_port)
- log.info(" zk_chroot = %s", self.zk_chroot)
- log.info(" replicas = %s", self.replicas)
- log.info(" partitions = %s", self.partitions)
- log.info(" tmp_dir = %s", self.tmp_dir)
-
- # Create directories
- os.mkdir(os.path.join(self.tmp_dir, "logs"))
- os.mkdir(os.path.join(self.tmp_dir, "data"))
-
+ def _create_zk_chroot(self):
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
- "-server", "%s:%d" % (self.zk_host, self.zk_port),
+ "-server",
+ "%s:%d" % (self.zookeeper.host,
+ self.zookeeper.port),
"create",
"/%s" % self.zk_chroot,
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if proc.wait() != 0:
+ if proc.wait() != 0 or proc.returncode != 0:
self.out("Failed to create Zookeeper chroot node")
self.out(proc.stdout.read())
self.out(proc.stderr.read())
raise RuntimeError("Failed to create Zookeeper chroot node")
- self.out("Done!")
+ self.out("Kafka chroot created in Zookeeper!")
+ def start(self):
# Configure Kafka child process
- properties = os.path.join(self.tmp_dir, "kafka.properties")
+ properties = self.tmp_dir.join("kafka.properties")
template = self.test_resource("kafka.properties")
- args = self.kafka_run_class_args("kafka.Kafka", properties)
+ args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
timeout = 5
@@ -305,14 +338,45 @@ class KafkaFixture(Fixture):
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
+
+ (self._client,) = self.get_clients(1, '_internal_client')
+
self.out("Done!")
self.running = True
+
+ def open(self):
+ if self.running:
+ self.out("Instance already running")
+ return
+
+ # Create directories
+ if self.tmp_dir is None:
+ self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
+ self.tmp_dir.ensure(dir=True)
+ self.tmp_dir.ensure('logs', dir=True)
+ self.tmp_dir.ensure('data', dir=True)
+
+ self.out("Running local instance...")
+ log.info(" host = %s", self.host)
+ log.info(" port = %s", self.port or '(auto)')
+ log.info(" transport = %s", self.transport)
+ log.info(" broker_id = %s", self.broker_id)
+ log.info(" zk_host = %s", self.zookeeper.host)
+ log.info(" zk_port = %s", self.zookeeper.port)
+ log.info(" zk_chroot = %s", self.zk_chroot)
+ log.info(" replicas = %s", self.replicas)
+ log.info(" partitions = %s", self.partitions)
+ log.info(" tmp_dir = %s", self.tmp_dir.strpath)
+
+ self._create_zk_chroot()
+ self.start()
+
atexit.register(self.close)
def __del__(self):
self.close()
- def close(self):
+ def stop(self):
if not self.running:
self.out("Instance already stopped")
return
@@ -320,6 +384,117 @@ class KafkaFixture(Fixture):
self.out("Stopping...")
self.child.stop()
self.child = None
- self.out("Done!")
- shutil.rmtree(self.tmp_dir)
self.running = False
+ self.out("Stopped!")
+
+ def close(self):
+ self.stop()
+ if self.tmp_dir is not None:
+ self.tmp_dir.remove()
+ self.tmp_dir = None
+ self.out("Done!")
+
+ def dump_logs(self):
+ super(KafkaFixture, self).dump_logs()
+ self.zookeeper.dump_logs()
+
+ def _send_request(self, request, timeout=None):
+ def _failure(error):
+ raise error
+ retries = 10
+ while True:
+ node_id = self._client.least_loaded_node()
+ for ready_retry in range(40):
+ if self._client.ready(node_id, False):
+ break
+ time.sleep(.1)
+ else:
+ raise RuntimeError('Could not connect to broker with node id %d' % (node_id,))
+
+ try:
+ future = self._client.send(node_id, request)
+ future.error_on_callbacks = True
+ future.add_errback(_failure)
+ return self._client.poll(future=future, timeout_ms=timeout)
+ except Exception as exc:
+ time.sleep(1)
+ retries -= 1
+ if retries == 0:
+ raise exc
+ else:
+ pass # retry
+
+ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
+ if num_partitions is None:
+ num_partitions = self.partitions
+ if replication_factor is None:
+ replication_factor = self.replicas
+
+ # Try different methods to create a topic, from the fastest to the slowest
+ if self.auto_create_topic and \
+ num_partitions == self.partitions and \
+ replication_factor == self.replicas:
+ self._send_request(MetadataRequest[0]([topic_name]))
+ elif version() >= (0, 10, 1, 0):
+ request = CreateTopicsRequest[0]([(topic_name, num_partitions,
+ replication_factor, [], [])], timeout_ms)
+ result = self._send_request(request, timeout=timeout_ms)
+ for topic_result in result[0].topic_error_codes:
+ error_code = topic_result[1]
+ if error_code != 0:
+ raise errors.for_code(error_code)
+ else:
+ args = self.kafka_run_class_args('kafka.admin.TopicCommand',
+ '--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
+ self.zookeeper.port,
+ self.zk_chroot),
+ '--create',
+ '--topic', topic_name,
+ '--partitions', self.partitions \
+ if num_partitions is None else num_partitions,
+ '--replication-factor', self.replicas \
+ if replication_factor is None \
+ else replication_factor)
+ if version() >= (0, 10):
+ args.append('--if-not-exists')
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ ret = proc.wait()
+ if ret != 0 or proc.returncode != 0:
+ output = proc.stdout.read()
+ if not 'kafka.common.TopicExistsException' in output:
+ self.out("Failed to create topic %s" % (topic_name,))
+ self.out(output)
+ self.out(proc.stderr.read())
+ raise RuntimeError("Failed to create topic %s" % (topic_name,))
+
+ def create_topics(self, topic_names, num_partitions=None, replication_factor=None):
+ for topic_name in topic_names:
+ self._create_topic(topic_name, num_partitions, replication_factor)
+
+ def get_clients(self, cnt=1, client_id=None):
+ if client_id is None:
+ client_id = 'client'
+ return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
+ bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
+
+ def get_consumers(self, cnt, topics, **params):
+ params.setdefault('client_id', 'consumer')
+ params.setdefault('heartbeat_interval_ms', 500)
+ params['bootstrap_servers'] = self.bootstrap_server()
+ client_id = params['client_id']
+ for x in range(cnt):
+ params['client_id'] = '%s_%s' % (client_id, random_string(4))
+ yield KafkaConsumer(*topics, **params)
+
+ def get_producers(self, cnt, **params):
+ params.setdefault('client_id', 'producer')
+ params['bootstrap_servers'] = self.bootstrap_server()
+ client_id = params['client_id']
+ for x in range(cnt):
+ params['client_id'] = '%s_%s' % (client_id, random_string(4))
+ yield KafkaProducer(**params)
+
+ def get_simple_client(self, **params):
+ params.setdefault('client_id', 'simple_client')
+ return SimpleClient(self.bootstrap_server(), **params)
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 742572d..df0faef 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -17,7 +17,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
return
cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+ cls.server = KafkaFixture.instance(0, cls.zk)
@classmethod
def tearDownClass(cls): # noqa
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 40eec14..fe4e454 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -21,9 +21,30 @@ from kafka.structs import (
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
- KafkaIntegrationTestCase, kafka_versions, random_string, Timer
+ KafkaIntegrationTestCase, kafka_versions, random_string, Timer,
+ send_messages
)
+def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
+ """Test KafkaConsumer
+ """
+ kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+
+ send_messages(simple_client, topic, 0, range(0, 100))
+ send_messages(simple_client, topic, 1, range(100, 200))
+
+ cnt = 0
+ messages = {0: set(), 1: set()}
+ for message in kafka_consumer:
+ logging.debug("Consumed message %s", repr(message))
+ cnt += 1
+ messages[message.partition].add(message.offset)
+ if cnt >= 200:
+ break
+
+ assert len(messages[0]) == 100
+ assert len(messages[1]) == 100
+
class TestConsumerIntegration(KafkaIntegrationTestCase):
maxDiff = None
@@ -35,9 +56,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
cls.zk = ZookeeperFixture.instance()
chroot = random_string(10)
- cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port,
+ cls.server1 = KafkaFixture.instance(0, cls.zk,
zk_chroot=chroot)
- cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port,
+ cls.server2 = KafkaFixture.instance(1, cls.zk,
zk_chroot=chroot)
cls.server = cls.server1 # Bootstrapping server
@@ -501,24 +522,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
- def test_kafka_consumer(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer = self.kafka_consumer(auto_offset_reset='earliest')
- n = 0
- messages = {0: set(), 1: set()}
- for m in consumer:
- logging.debug("Consumed message %s" % repr(m))
- n += 1
- messages[m.partition].add(m.offset)
- if n >= 200:
- break
-
- self.assertEqual(len(messages[0]), 100)
- self.assertEqual(len(messages[1]), 100)
-
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 9141947..8531cfb 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -29,10 +29,9 @@ class TestFailover(KafkaIntegrationTestCase):
# mini zookeeper, 3 kafka brokers
self.zk = ZookeeperFixture.instance()
- kk_args = [self.zk.host, self.zk.port]
kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas,
'partitions': partitions}
- self.brokers = [KafkaFixture.instance(i, *kk_args, **kk_kwargs)
+ self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs)
for i in range(replicas)]
hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index a304e83..ca0da6a 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -15,7 +15,50 @@ from kafka.producer.base import Producer
from kafka.structs import FetchRequestPayload, ProduceRequestPayload
from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, kafka_versions
+from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset
+
+# TODO: This duplicates a TestKafkaProducerIntegration method temporarily
+# while the migration to pytest is in progress
+def assert_produce_request(client, topic, messages, initial_offset, message_ct,
+ partition=0):
+ """Verify the correctness of a produce request
+ """
+ produce = ProduceRequestPayload(topic, partition, messages=messages)
+
+ # There should only be one response message from the server.
+ # This will throw an exception if there's more than one.
+ resp = client.send_produce_request([produce])
+ assert_produce_response(resp, initial_offset)
+
+ assert current_offset(client, topic, partition) == initial_offset + message_ct
+
+def assert_produce_response(resp, initial_offset):
+ """Verify that a produce response is well-formed
+ """
+ assert len(resp) == 1
+ assert resp[0].error == 0
+ assert resp[0].offset == initial_offset
+
+def test_produce_many_simple(simple_client, topic):
+ """Test multiple produces using the SimpleClient
+ """
+ start_offset = current_offset(simple_client, topic, 0)
+
+ assert_produce_request(
+ simple_client, topic,
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
+ start_offset,
+ 100,
+ )
+
+ assert_produce_request(
+ simple_client, topic,
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
+ start_offset+100,
+ 100,
+ )
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@@ -26,7 +69,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
return
cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+ cls.server = KafkaFixture.instance(0, cls.zk)
@classmethod
def tearDownClass(cls): # noqa
@@ -36,23 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
- def test_produce_many_simple(self):
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset,
- 100,
- )
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset+100,
- 100,
- )
-
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
diff --git a/test/testutil.py b/test/testutil.py
index 0ec1cff..850e925 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,36 +1,20 @@
-import functools
-import logging
import operator
import os
-import random
import socket
-import string
import time
import uuid
-from six.moves import xrange
+import decorator
+import pytest
from . import unittest
-from kafka import SimpleClient
+from kafka import SimpleClient, create_message
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
-from kafka.structs import OffsetRequestPayload
-
-__all__ = [
- 'random_string',
- 'get_open_port',
- 'kafka_versions',
- 'KafkaIntegrationTestCase',
- 'Timer',
-]
-
-def random_string(l):
- return "".join(random.choice(string.ascii_letters) for i in xrange(l))
+from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
+from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
def kafka_versions(*versions):
- def version_str_to_list(s):
- return list(map(int, s.split('.'))) # e.g., [0, 8, 1, 1]
-
def construct_lambda(s):
if s[0].isdigit():
op_str = '='
@@ -54,25 +38,25 @@ def kafka_versions(*versions):
}
op = op_map[op_str]
version = version_str_to_list(v_str)
- return lambda a: op(version_str_to_list(a), version)
+ return lambda a: op(a, version)
validators = map(construct_lambda, versions)
- def kafka_versions(func):
- @functools.wraps(func)
- def wrapper(self):
- kafka_version = os.environ.get('KAFKA_VERSION')
+ def real_kafka_versions(func):
+ def wrapper(func, *args, **kwargs):
+ version = kafka_version()
- if not kafka_version:
- self.skipTest("no kafka version set in KAFKA_VERSION env var")
+ if not version:
+ pytest.skip("no kafka version set in KAFKA_VERSION env var")
for f in validators:
- if not f(kafka_version):
- self.skipTest("unsupported kafka version")
+ if not f(version):
+ pytest.skip("unsupported kafka version")
- return func(self)
- return wrapper
- return kafka_versions
+ return func(*args, **kwargs)
+ return decorator.decorator(wrapper, func)
+
+ return real_kafka_versions
def get_open_port():
sock = socket.socket()
@@ -81,6 +65,40 @@ def get_open_port():
sock.close()
return port
+_MESSAGES = {}
+def msg(message):
+ """Format, encode and deduplicate a message
+ """
+ global _MESSAGES #pylint: disable=global-statement
+ if message not in _MESSAGES:
+ _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))
+
+ return _MESSAGES[message].encode('utf-8')
+
+def send_messages(client, topic, partition, messages):
+ """Send messages to a topic's partition
+ """
+ messages = [create_message(msg(str(m))) for m in messages]
+ produce = ProduceRequestPayload(topic, partition, messages=messages)
+ resp, = client.send_produce_request([produce])
+ assert resp.error == 0
+
+ return [x.value for x in messages]
+
+def current_offset(client, topic, partition, kafka_broker=None):
+ """Get the current offset of a topic's partition
+ """
+ try:
+ offsets, = client.send_offset_request([OffsetRequestPayload(topic,
+ partition, -1, 1)])
+ except Exception:
+ # XXX: We've seen some UnknownErrors here and can't debug w/o server logs
+ if kafka_broker:
+ kafka_broker.dump_logs()
+ raise
+ else:
+ return offsets.offsets[0]
+
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
@@ -122,7 +140,8 @@ class KafkaIntegrationTestCase(unittest.TestCase):
def current_offset(self, topic, partition):
try:
- offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)])
+ offsets, = self.client.send_offset_request([OffsetRequestPayload(topic,
+ partition, -1, 1)])
except Exception:
# XXX: We've seen some UnknownErrors here and can't debug w/o server logs
self.zk.child.dump_logs()
@@ -132,7 +151,7 @@ class KafkaIntegrationTestCase(unittest.TestCase):
return offsets.offsets[0]
def msgs(self, iterable):
- return [ self.msg(x) for x in iterable ]
+ return [self.msg(x) for x in iterable]
def msg(self, s):
if s not in self._messages:
diff --git a/tox.ini b/tox.ini
index 35dc842..ad95f93 100644
--- a/tox.ini
+++ b/tox.ini
@@ -20,6 +20,7 @@ deps =
xxhash
crc32c
py26: unittest2
+ decorator
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
setenv =