diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-11-10 12:48:33 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-11-10 12:48:33 -0800 |
commit | bb5bc1fcfc09c9c9994edbbae0af2ff6802c353d (patch) | |
tree | acce192f70b0eeafd9dd68f80d5b2f6739247b42 | |
parent | cd47701ba63fc77309066e27b73f50d0150e3e1b (diff) | |
download | kafka-python-bb5bc1fcfc09c9c9994edbbae0af2ff6802c353d.tar.gz |
Migrate from `Unittest` to `pytest` (#1620)
-rw-r--r-- | test/conftest.py | 2 | ||||
-rw-r--r-- | test/test_consumer.py | 16 | ||||
-rw-r--r-- | test/test_consumer_group.py | 20 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 19 | ||||
-rw-r--r-- | test/test_package.py | 23 | ||||
-rw-r--r-- | test/testutil.py | 26 | ||||
-rw-r--r-- | tox.ini | 1 |
7 files changed, 34 insertions, 73 deletions
diff --git a/test/conftest.py b/test/conftest.py index a751d95..ffaae03 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,7 +1,5 @@ from __future__ import absolute_import -import inspect - import pytest from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version diff --git a/test/test_consumer.py b/test/test_consumer.py index 013529f..4ea01c8 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -2,6 +2,7 @@ import sys from mock import MagicMock, patch from . import unittest +import pytest from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer from kafka.errors import ( @@ -11,17 +12,13 @@ from kafka.structs import ( FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload) -class TestKafkaConsumer(unittest.TestCase): - def test_non_integer_partitions(self): - with self.assertRaises(AssertionError): - SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0']) - +class TestKafkaConsumer: def test_session_timeout_larger_than_request_timeout_raises(self): - with self.assertRaises(KafkaConfigurationError): + with pytest.raises(KafkaConfigurationError): KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000) def test_fetch_max_wait_larger_than_request_timeout_raises(self): - with self.assertRaises(KafkaConfigurationError): + with pytest.raises(KafkaConfigurationError): KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000) def test_subscription_copy(self): @@ -43,7 +40,12 @@ class TestMultiProcessConsumer(unittest.TestCase): self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) ) self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member + class TestSimpleConsumer(unittest.TestCase): + def test_non_integer_partitions(self): + with self.assertRaises(AssertionError): + SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0']) + def test_simple_consumer_failed_payloads(self): client = MagicMock() consumer = SimpleConsumer(client, group=None, diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 01eb39e..5b468dc 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -6,7 +6,6 @@ import time import pytest from kafka.vendor import six -from kafka import SimpleClient from kafka.conn import ConnectionStates from kafka.consumer.group import KafkaConsumer from kafka.coordinator.base import MemberState, Generation @@ -20,25 +19,10 @@ def get_connect_str(kafka_broker): return kafka_broker.host + ':' + str(kafka_broker.port) -@pytest.fixture -def simple_client(kafka_broker): - return SimpleClient(get_connect_str(kafka_broker)) - - -@pytest.fixture -def topic(simple_client): - topic = random_string(5) - simple_client.ensure_topic_exists(topic) - return topic - - @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -def test_consumer(kafka_broker, version): - +def test_consumer(kafka_broker, topic, version): + # The `topic` fixture is included because # 0.8.2 brokers need a topic to function well - if version >= (0, 8, 2) and version < (0, 9): - topic(simple_client(kafka_broker)) - consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) consumer.poll(500) assert len(consumer._client._conns) > 0 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9a7790e..9f76f7f 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -25,20 +25,21 @@ from kafka.structs import ( from test.conftest import version from test.fixtures import ZookeeperFixture, KafkaFixture, random_string -from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, Timer, - send_messages -) +from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): - """Test KafkaConsumer - """ +def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): + """Test KafkaConsumer""" kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') - send_messages(simple_client, topic, 0, range(0, 100)) - send_messages(simple_client, topic, 1, range(100, 200)) + # TODO replace this with a `send_messages()` pytest fixture + # as we will likely need this elsewhere + for i in range(0, 100): + kafka_producer.send(topic, partition=0, value=str(i).encode()) + for i in range(100, 200): + kafka_producer.send(topic, partition=1, value=str(i).encode()) + kafka_producer.flush() cnt = 0 messages = {0: set(), 1: set()} diff --git a/test/test_package.py b/test/test_package.py index eb53027..e520f3f 100644 --- a/test/test_package.py +++ b/test/test_package.py @@ -1,28 +1,25 @@ -from . import unittest - - -class TestPackage(unittest.TestCase): +class TestPackage: def test_top_level_namespace(self): import kafka as kafka1 - self.assertEqual(kafka1.KafkaConsumer.__name__, "KafkaConsumer") - self.assertEqual(kafka1.consumer.__name__, "kafka.consumer") - self.assertEqual(kafka1.codec.__name__, "kafka.codec") + assert kafka1.KafkaConsumer.__name__ == "KafkaConsumer" + assert kafka1.consumer.__name__ == "kafka.consumer" + assert kafka1.codec.__name__ == "kafka.codec" def test_submodule_namespace(self): import kafka.client as client1 - self.assertEqual(client1.__name__, "kafka.client") + assert client1.__name__ == "kafka.client" from kafka import client as client2 - self.assertEqual(client2.__name__, "kafka.client") + assert client2.__name__ == "kafka.client" from kafka.client import SimpleClient as SimpleClient1 - self.assertEqual(SimpleClient1.__name__, "SimpleClient") + assert SimpleClient1.__name__ == "SimpleClient" from kafka.codec import gzip_encode as gzip_encode1 - self.assertEqual(gzip_encode1.__name__, "gzip_encode") + assert gzip_encode1.__name__ == "gzip_encode" from kafka import SimpleClient as SimpleClient2 - self.assertEqual(SimpleClient2.__name__, "SimpleClient") + assert SimpleClient2.__name__ == "SimpleClient" from kafka.codec import snappy_encode - self.assertEqual(snappy_encode.__name__, "snappy_encode") + assert snappy_encode.__name__ == "snappy_encode" diff --git a/test/testutil.py b/test/testutil.py index feb6f6d..6f6cafb 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -3,20 +3,19 @@ from __future__ import absolute_import import functools import operator import os -import socket import time import uuid import pytest from . import unittest -from kafka import SimpleClient, create_message +from kafka import SimpleClient from kafka.errors import ( LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError, NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError ) -from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from kafka.structs import OffsetRequestPayload from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order @@ -67,26 +66,6 @@ def kafka_versions(*versions): return real_kafka_versions -_MESSAGES = {} -def msg(message): - """Format, encode and deduplicate a message - """ - global _MESSAGES #pylint: disable=global-statement - if message not in _MESSAGES: - _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4())) - - return _MESSAGES[message].encode('utf-8') - -def send_messages(client, topic, partition, messages): - """Send messages to a topic's partition - """ - messages = [create_message(msg(str(m))) for m in messages] - produce = ProduceRequestPayload(topic, partition, messages=messages) - resp, = client.send_produce_request([produce]) - assert resp.error == 0 - - return [x.value for x in messages] - def current_offset(client, topic, partition, kafka_broker=None): """Get the current offset of a topic's partition """ @@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None): else: return offsets.offsets[0] + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None @@ -20,7 +20,6 @@ deps = xxhash crc32c py26: unittest2 - decorator commands = py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} setenv = |