summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-10 12:48:33 -0800
committerDana Powers <dana.powers@gmail.com>2018-11-10 12:48:33 -0800
commitbb5bc1fcfc09c9c9994edbbae0af2ff6802c353d (patch)
treeacce192f70b0eeafd9dd68f80d5b2f6739247b42
parentcd47701ba63fc77309066e27b73f50d0150e3e1b (diff)
downloadkafka-python-bb5bc1fcfc09c9c9994edbbae0af2ff6802c353d.tar.gz
Migrate from `Unittest` to `pytest` (#1620)
-rw-r--r--test/conftest.py2
-rw-r--r--test/test_consumer.py16
-rw-r--r--test/test_consumer_group.py20
-rw-r--r--test/test_consumer_integration.py19
-rw-r--r--test/test_package.py23
-rw-r--r--test/testutil.py26
-rw-r--r--tox.ini1
7 files changed, 34 insertions, 73 deletions
diff --git a/test/conftest.py b/test/conftest.py
index a751d95..ffaae03 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,7 +1,5 @@
from __future__ import absolute_import
-import inspect
-
import pytest
from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 013529f..4ea01c8 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -2,6 +2,7 @@ import sys
from mock import MagicMock, patch
from . import unittest
+import pytest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.errors import (
@@ -11,17 +12,13 @@ from kafka.structs import (
FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)
-class TestKafkaConsumer(unittest.TestCase):
- def test_non_integer_partitions(self):
- with self.assertRaises(AssertionError):
- SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])
-
+class TestKafkaConsumer:
def test_session_timeout_larger_than_request_timeout_raises(self):
- with self.assertRaises(KafkaConfigurationError):
+ with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
- with self.assertRaises(KafkaConfigurationError):
+ with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
def test_subscription_copy(self):
@@ -43,7 +40,12 @@ class TestMultiProcessConsumer(unittest.TestCase):
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
+
class TestSimpleConsumer(unittest.TestCase):
+ def test_non_integer_partitions(self):
+ with self.assertRaises(AssertionError):
+ SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])
+
def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 01eb39e..5b468dc 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -6,7 +6,6 @@ import time
import pytest
from kafka.vendor import six
-from kafka import SimpleClient
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState, Generation
@@ -20,25 +19,10 @@ def get_connect_str(kafka_broker):
return kafka_broker.host + ':' + str(kafka_broker.port)
-@pytest.fixture
-def simple_client(kafka_broker):
- return SimpleClient(get_connect_str(kafka_broker))
-
-
-@pytest.fixture
-def topic(simple_client):
- topic = random_string(5)
- simple_client.ensure_topic_exists(topic)
- return topic
-
-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_consumer(kafka_broker, version):
-
+def test_consumer(kafka_broker, topic, version):
+ # The `topic` fixture is included because
# 0.8.2 brokers need a topic to function well
- if version >= (0, 8, 2) and version < (0, 9):
- topic(simple_client(kafka_broker))
-
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
consumer.poll(500)
assert len(consumer._client._conns) > 0
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 9a7790e..9f76f7f 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -25,20 +25,21 @@ from kafka.structs import (
from test.conftest import version
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
-from test.testutil import (
- KafkaIntegrationTestCase, kafka_versions, Timer,
- send_messages
-)
+from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
- """Test KafkaConsumer
- """
+def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
+ """Test KafkaConsumer"""
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
- send_messages(simple_client, topic, 0, range(0, 100))
- send_messages(simple_client, topic, 1, range(100, 200))
+ # TODO replace this with a `send_messages()` pytest fixture
+ # as we will likely need this elsewhere
+ for i in range(0, 100):
+ kafka_producer.send(topic, partition=0, value=str(i).encode())
+ for i in range(100, 200):
+ kafka_producer.send(topic, partition=1, value=str(i).encode())
+ kafka_producer.flush()
cnt = 0
messages = {0: set(), 1: set()}
diff --git a/test/test_package.py b/test/test_package.py
index eb53027..e520f3f 100644
--- a/test/test_package.py
+++ b/test/test_package.py
@@ -1,28 +1,25 @@
-from . import unittest
-
-
-class TestPackage(unittest.TestCase):
+class TestPackage:
def test_top_level_namespace(self):
import kafka as kafka1
- self.assertEqual(kafka1.KafkaConsumer.__name__, "KafkaConsumer")
- self.assertEqual(kafka1.consumer.__name__, "kafka.consumer")
- self.assertEqual(kafka1.codec.__name__, "kafka.codec")
+ assert kafka1.KafkaConsumer.__name__ == "KafkaConsumer"
+ assert kafka1.consumer.__name__ == "kafka.consumer"
+ assert kafka1.codec.__name__ == "kafka.codec"
def test_submodule_namespace(self):
import kafka.client as client1
- self.assertEqual(client1.__name__, "kafka.client")
+ assert client1.__name__ == "kafka.client"
from kafka import client as client2
- self.assertEqual(client2.__name__, "kafka.client")
+ assert client2.__name__ == "kafka.client"
from kafka.client import SimpleClient as SimpleClient1
- self.assertEqual(SimpleClient1.__name__, "SimpleClient")
+ assert SimpleClient1.__name__ == "SimpleClient"
from kafka.codec import gzip_encode as gzip_encode1
- self.assertEqual(gzip_encode1.__name__, "gzip_encode")
+ assert gzip_encode1.__name__ == "gzip_encode"
from kafka import SimpleClient as SimpleClient2
- self.assertEqual(SimpleClient2.__name__, "SimpleClient")
+ assert SimpleClient2.__name__ == "SimpleClient"
from kafka.codec import snappy_encode
- self.assertEqual(snappy_encode.__name__, "snappy_encode")
+ assert snappy_encode.__name__ == "snappy_encode"
diff --git a/test/testutil.py b/test/testutil.py
index feb6f6d..6f6cafb 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -3,20 +3,19 @@ from __future__ import absolute_import
import functools
import operator
import os
-import socket
import time
import uuid
import pytest
from . import unittest
-from kafka import SimpleClient, create_message
+from kafka import SimpleClient
from kafka.errors import (
LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
FailedPayloadsError
)
-from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
+from kafka.structs import OffsetRequestPayload
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
@@ -67,26 +66,6 @@ def kafka_versions(*versions):
return real_kafka_versions
-_MESSAGES = {}
-def msg(message):
- """Format, encode and deduplicate a message
- """
- global _MESSAGES #pylint: disable=global-statement
- if message not in _MESSAGES:
- _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))
-
- return _MESSAGES[message].encode('utf-8')
-
-def send_messages(client, topic, partition, messages):
- """Send messages to a topic's partition
- """
- messages = [create_message(msg(str(m))) for m in messages]
- produce = ProduceRequestPayload(topic, partition, messages=messages)
- resp, = client.send_produce_request([produce])
- assert resp.error == 0
-
- return [x.value for x in messages]
-
def current_offset(client, topic, partition, kafka_broker=None):
"""Get the current offset of a topic's partition
"""
@@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None):
else:
return offsets.offsets[0]
+
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
diff --git a/tox.ini b/tox.ini
index 1760aff..599a534 100644
--- a/tox.ini
+++ b/tox.ini
@@ -20,7 +20,6 @@ deps =
xxhash
crc32c
py26: unittest2
- decorator
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
setenv =