summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-08-22 21:14:37 -0700
committerGitHub <noreply@github.com>2019-08-22 21:14:37 -0700
commit61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa (patch)
tree7e9ade5bae3b4b3172b39771e08eb6b7108092c8
parent6e6d0cca5dbdf0a9ae3a032b6de08f9bbbf9606a (diff)
downloadkafka-python-61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa.tar.gz
Convert remaining `KafkaConsumer` tests to `pytest` (#1886)
This makes it so the only remaining use of `unittest` is in the old tests of the deprecated `Simple*` clients. All `KafkaConsumer` tests are migrated to `pytest`. I also had to bump the test iterations up on one of the tests, I think there was a race condition there that was more commonly hit under pytest , planning to cleanup that in a followup PR. See https://github.com/dpkp/kafka-python/pull/1886#discussion_r316860737 for details.
-rw-r--r--test/conftest.py26
-rw-r--r--test/test_consumer_group.py2
-rw-r--r--test/test_consumer_integration.py501
-rw-r--r--test/testutil.py11
4 files changed, 284 insertions, 256 deletions
diff --git a/test/conftest.py b/test/conftest.py
index 5015cc7..267ac6a 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+import uuid
+
import pytest
from test.testutil import env_kafka_version, random_string
@@ -137,3 +139,27 @@ def conn(mocker):
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn
+
+
+@pytest.fixture()
+def send_messages(topic, kafka_producer, request):
+ """A factory that returns a send_messages function with a pre-populated
+ topic topic / producer."""
+
+ def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
+ """
+ messages is typically `range(0,100)`
+ partition is an int
+ """
+ messages_and_futures = [] # [(message, produce_future),]
+ for i in number_range:
+ # request.node.name provides the test name (including parametrized values)
+ encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
+ future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
+ messages_and_futures.append((encoded_msg, future))
+ kafka_producer.flush()
+ for (msg, f) in messages_and_futures:
+ assert f.succeeded()
+ return [msg for (msg, f) in messages_and_futures]
+
+ return _send_messages
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 3367617..58dc7eb 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -29,6 +29,7 @@ def test_consumer(kafka_broker, topic):
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
consumer.close()
+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
@@ -38,6 +39,7 @@ def test_consumer_topics(kafka_broker, topic):
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()
+
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
num_partitions = 4
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index cb05242..c7e2ebf 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -4,7 +4,6 @@ import time
from mock import patch
import pytest
-from kafka.vendor import six
from kafka.vendor.six.moves import range
from . import unittest
@@ -23,34 +22,26 @@ from kafka.structs import (
)
from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string
+from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
+def test_kafka_consumer(kafka_consumer_factory, send_messages):
"""Test KafkaConsumer"""
- kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
-
- # TODO replace this with a `send_messages()` pytest fixture
- # as we will likely need this elsewhere
- for i in range(0, 100):
- kafka_producer.send(topic, partition=0, value=str(i).encode())
- for i in range(100, 200):
- kafka_producer.send(topic, partition=1, value=str(i).encode())
- kafka_producer.flush()
-
+ consumer = kafka_consumer_factory(auto_offset_reset='earliest')
+ send_messages(range(0, 100), partition=0)
+ send_messages(range(0, 100), partition=1)
cnt = 0
- messages = {0: set(), 1: set()}
- for message in kafka_consumer:
+ messages = {0: [], 1: []}
+ for message in consumer:
logging.debug("Consumed message %s", repr(message))
cnt += 1
- messages[message.partition].add(message.offset)
+ messages[message.partition].append(message)
if cnt >= 200:
break
- assert len(messages[0]) == 100
- assert len(messages[1]) == 100
- kafka_consumer.close()
+ assert_message_count(messages[0], 100)
+ assert_message_count(messages[1], 100)
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@@ -547,242 +538,240 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_kafka_consumer__blocking(self):
- TIMEOUT_MS = 500
- consumer = self.kafka_consumer(auto_offset_reset='earliest',
- enable_auto_commit=False,
- consumer_timeout_ms=TIMEOUT_MS)
-
- # Manual assignment avoids overhead of consumer group mgmt
- consumer.unsubscribe()
- consumer.assign([TopicPartition(self.topic, 0)])
- # Ask for 5 messages, nothing in queue, block 500ms
- with Timer() as t:
- with self.assertRaises(StopIteration):
- msg = next(consumer)
- self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
-
- self.send_messages(0, range(0, 10))
-
- # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
- messages = set()
- with Timer() as t:
- for i in range(5):
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
+def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages):
+ TIMEOUT_MS = 500
+ consumer = kafka_consumer_factory(auto_offset_reset='earliest',
+ enable_auto_commit=False,
+ consumer_timeout_ms=TIMEOUT_MS)
+
+ # Manual assignment avoids overhead of consumer group mgmt
+ consumer.unsubscribe()
+ consumer.assign([TopicPartition(topic, 0)])
+
+ # Ask for 5 messages, nothing in queue, block 500ms
+ with Timer() as t:
+ with pytest.raises(StopIteration):
+ msg = next(consumer)
+ assert t.interval >= (TIMEOUT_MS / 1000.0)
+
+ send_messages(range(0, 10))
+
+ # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
+ messages = []
+ with Timer() as t:
+ for i in range(5):
+ msg = next(consumer)
+ messages.append(msg)
+ assert_message_count(messages, 5)
+ assert t.interval < (TIMEOUT_MS / 1000.0)
+
+ # Ask for 10 messages, get 5 back, block 500ms
+ messages = []
+ with Timer() as t:
+ with pytest.raises(StopIteration):
+ for i in range(10):
msg = next(consumer)
- messages.add((msg.partition, msg.offset))
- self.assertEqual(len(messages), 5)
- self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
-
- # Ask for 10 messages, get 5 back, block 500ms
- messages = set()
- with Timer() as t:
- with self.assertRaises(StopIteration):
- for i in range(10):
- msg = next(consumer)
- messages.add((msg.partition, msg.offset))
- self.assertEqual(len(messages), 5)
- self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
- consumer.close()
-
- @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1")
- def test_kafka_consumer__offset_commit_resume(self):
- GROUP_ID = random_string(10)
-
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer1 = self.kafka_consumer(
- group_id=GROUP_ID,
- enable_auto_commit=True,
- auto_commit_interval_ms=100,
- auto_offset_reset='earliest',
- )
-
- # Grab the first 180 messages
- output_msgs1 = []
- for _ in range(180):
- m = next(consumer1)
- output_msgs1.append((m.key, m.value))
- self.assert_message_count(output_msgs1, 180)
- consumer1.close()
-
- # The total offset across both partitions should be at 180
- consumer2 = self.kafka_consumer(
- group_id=GROUP_ID,
- enable_auto_commit=True,
- auto_commit_interval_ms=100,
- auto_offset_reset='earliest',
- )
-
- # 181-200
- output_msgs2 = []
- for _ in range(20):
- m = next(consumer2)
- output_msgs2.append((m.key, m.value))
- self.assert_message_count(output_msgs2, 20)
- self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
- consumer2.close()
-
- @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
- def test_kafka_consumer_max_bytes_simple(self):
- self.send_messages(0, range(100, 200))
- self.send_messages(1, range(200, 300))
-
- # Start a consumer
- consumer = self.kafka_consumer(
- auto_offset_reset='earliest', fetch_max_bytes=300)
- seen_partitions = set([])
- for i in range(10):
- poll_res = consumer.poll(timeout_ms=100)
- for partition, msgs in six.iteritems(poll_res):
- for msg in msgs:
- seen_partitions.add(partition)
-
- # Check that we fetched at least 1 message from both partitions
- self.assertEqual(
- seen_partitions, set([
- TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
- consumer.close()
-
- @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
- def test_kafka_consumer_max_bytes_one_msg(self):
- # We send to only 1 partition so we don't have parallel requests to 2
- # nodes for data.
- self.send_messages(0, range(100, 200))
-
- # Start a consumer. FetchResponse_v3 should always include at least 1
- # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
- # But 0.11.0.0 returns 1 MessageSet at a time when the messages are
- # stored in the new v2 format by the broker.
- #
- # DP Note: This is a strange test. The consumer shouldn't care
- # how many messages are included in a FetchResponse, as long as it is
- # non-zero. I would not mind if we deleted this test. It caused
- # a minor headache when testing 0.11.0.0.
- group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
- consumer = self.kafka_consumer(
- group_id=group,
- auto_offset_reset='earliest',
- consumer_timeout_ms=5000,
- fetch_max_bytes=1)
-
- fetched_msgs = [next(consumer) for i in range(10)]
- self.assertEqual(len(fetched_msgs), 10)
- consumer.close()
-
- @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
- def test_kafka_consumer_offsets_for_time(self):
- late_time = int(time.time()) * 1000
- middle_time = late_time - 1000
- early_time = late_time - 2000
- tp = TopicPartition(self.topic, 0)
-
- timeout = 10
- kafka_producer = self.kafka_producer()
- early_msg = kafka_producer.send(
- self.topic, partition=0, value=b"first",
- timestamp_ms=early_time).get(timeout)
- late_msg = kafka_producer.send(
- self.topic, partition=0, value=b"last",
- timestamp_ms=late_time).get(timeout)
-
- consumer = self.kafka_consumer()
- offsets = consumer.offsets_for_times({tp: early_time})
- self.assertEqual(len(offsets), 1)
- self.assertEqual(offsets[tp].offset, early_msg.offset)
- self.assertEqual(offsets[tp].timestamp, early_time)
-
- offsets = consumer.offsets_for_times({tp: middle_time})
- self.assertEqual(offsets[tp].offset, late_msg.offset)
- self.assertEqual(offsets[tp].timestamp, late_time)
-
- offsets = consumer.offsets_for_times({tp: late_time})
- self.assertEqual(offsets[tp].offset, late_msg.offset)
- self.assertEqual(offsets[tp].timestamp, late_time)
-
- offsets = consumer.offsets_for_times({})
- self.assertEqual(offsets, {})
-
- # Out of bound timestamps check
-
- offsets = consumer.offsets_for_times({tp: 0})
- self.assertEqual(offsets[tp].offset, early_msg.offset)
- self.assertEqual(offsets[tp].timestamp, early_time)
-
- offsets = consumer.offsets_for_times({tp: 9999999999999})
- self.assertEqual(offsets[tp], None)
-
- # Beginning/End offsets
-
- offsets = consumer.beginning_offsets([tp])
- self.assertEqual(offsets, {
- tp: early_msg.offset,
- })
- offsets = consumer.end_offsets([tp])
- self.assertEqual(offsets, {
- tp: late_msg.offset + 1
- })
- consumer.close()
-
- @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
- def test_kafka_consumer_offsets_search_many_partitions(self):
- tp0 = TopicPartition(self.topic, 0)
- tp1 = TopicPartition(self.topic, 1)
-
- kafka_producer = self.kafka_producer()
- send_time = int(time.time() * 1000)
- timeout = 10
- p0msg = kafka_producer.send(
- self.topic, partition=0, value=b"XXX",
- timestamp_ms=send_time).get(timeout)
- p1msg = kafka_producer.send(
- self.topic, partition=1, value=b"XXX",
- timestamp_ms=send_time).get(timeout)
-
- consumer = self.kafka_consumer()
- offsets = consumer.offsets_for_times({
- tp0: send_time,
- tp1: send_time
- })
-
- self.assertEqual(offsets, {
- tp0: OffsetAndTimestamp(p0msg.offset, send_time),
- tp1: OffsetAndTimestamp(p1msg.offset, send_time)
- })
-
- offsets = consumer.beginning_offsets([tp0, tp1])
- self.assertEqual(offsets, {
- tp0: p0msg.offset,
- tp1: p1msg.offset
- })
-
- offsets = consumer.end_offsets([tp0, tp1])
- self.assertEqual(offsets, {
- tp0: p0msg.offset + 1,
- tp1: p1msg.offset + 1
- })
- consumer.close()
-
- @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1")
- def test_kafka_consumer_offsets_for_time_old(self):
- consumer = self.kafka_consumer()
- tp = TopicPartition(self.topic, 0)
-
- with self.assertRaises(UnsupportedVersionError):
- consumer.offsets_for_times({tp: int(time.time())})
-
- @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
- def test_kafka_consumer_offsets_for_times_errors(self):
- consumer = self.kafka_consumer(fetch_max_wait_ms=200,
- request_timeout_ms=500)
- tp = TopicPartition(self.topic, 0)
- bad_tp = TopicPartition(self.topic, 100)
-
- with self.assertRaises(ValueError):
- consumer.offsets_for_times({tp: -1})
-
- with self.assertRaises(KafkaTimeoutError):
- consumer.offsets_for_times({bad_tp: 0})
+ messages.append(msg)
+ assert_message_count(messages, 5)
+ assert t.interval >= (TIMEOUT_MS / 1000.0)
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1")
+def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messages):
+ GROUP_ID = random_string(10)
+
+ send_messages(range(0, 100), partition=0)
+ send_messages(range(100, 200), partition=1)
+
+ # Start a consumer and grab the first 180 messages
+ consumer1 = kafka_consumer_factory(
+ group_id=GROUP_ID,
+ enable_auto_commit=True,
+ auto_commit_interval_ms=100,
+ auto_offset_reset='earliest',
+ )
+ output_msgs1 = []
+ for _ in range(180):
+ m = next(consumer1)
+ output_msgs1.append(m)
+ assert_message_count(output_msgs1, 180)
+
+ # Normally we let the pytest fixture `kafka_consumer_factory` handle
+ # closing as part of its teardown. Here we manually call close() to force
+ # auto-commit to occur before the second consumer starts. That way the
+ # second consumer only consumes previously unconsumed messages.
+ consumer1.close()
+
+ # Start a second consumer to grab 181-200
+ consumer2 = kafka_consumer_factory(
+ group_id=GROUP_ID,
+ enable_auto_commit=True,
+ auto_commit_interval_ms=100,
+ auto_offset_reset='earliest',
+ )
+ output_msgs2 = []
+ for _ in range(20):
+ m = next(consumer2)
+ output_msgs2.append(m)
+ assert_message_count(output_msgs2, 20)
+
+ # Verify the second consumer wasn't reconsuming messages that the first
+ # consumer already saw
+ assert_message_count(output_msgs1 + output_msgs2, 200)
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
+def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages):
+ send_messages(range(100, 200), partition=0)
+ send_messages(range(200, 300), partition=1)
+
+ # Start a consumer
+ consumer = kafka_consumer_factory(
+ auto_offset_reset='earliest', fetch_max_bytes=300)
+ seen_partitions = set()
+ for i in range(90):
+ poll_res = consumer.poll(timeout_ms=100)
+ for partition, msgs in poll_res.items():
+ for msg in msgs:
+ seen_partitions.add(partition)
+
+ # Check that we fetched at least 1 message from both partitions
+ assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
+def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages):
+ # We send to only 1 partition so we don't have parallel requests to 2
+ # nodes for data.
+ send_messages(range(100, 200))
+
+ # Start a consumer. FetchResponse_v3 should always include at least 1
+ # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
+ # But 0.11.0.0 returns 1 MessageSet at a time when the messages are
+ # stored in the new v2 format by the broker.
+ #
+ # DP Note: This is a strange test. The consumer shouldn't care
+ # how many messages are included in a FetchResponse, as long as it is
+ # non-zero. I would not mind if we deleted this test. It caused
+ # a minor headache when testing 0.11.0.0.
+ group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
+ consumer = kafka_consumer_factory(
+ group_id=group,
+ auto_offset_reset='earliest',
+ consumer_timeout_ms=5000,
+ fetch_max_bytes=1)
+
+ fetched_msgs = [next(consumer) for i in range(10)]
+ assert_message_count(fetched_msgs, 10)
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
+def test_kafka_consumer_offsets_for_time(topic, kafka_consumer, kafka_producer):
+ late_time = int(time.time()) * 1000
+ middle_time = late_time - 1000
+ early_time = late_time - 2000
+ tp = TopicPartition(topic, 0)
+
+ timeout = 10
+ early_msg = kafka_producer.send(
+ topic, partition=0, value=b"first",
+ timestamp_ms=early_time).get(timeout)
+ late_msg = kafka_producer.send(
+ topic, partition=0, value=b"last",
+ timestamp_ms=late_time).get(timeout)
+
+ consumer = kafka_consumer
+ offsets = consumer.offsets_for_times({tp: early_time})
+ assert len(offsets) == 1
+ assert offsets[tp].offset == early_msg.offset
+ assert offsets[tp].timestamp == early_time
+
+ offsets = consumer.offsets_for_times({tp: middle_time})
+ assert offsets[tp].offset == late_msg.offset
+ assert offsets[tp].timestamp == late_time
+
+ offsets = consumer.offsets_for_times({tp: late_time})
+ assert offsets[tp].offset == late_msg.offset
+ assert offsets[tp].timestamp == late_time
+
+ offsets = consumer.offsets_for_times({})
+ assert offsets == {}
+
+ # Out of bound timestamps check
+
+ offsets = consumer.offsets_for_times({tp: 0})
+ assert offsets[tp].offset == early_msg.offset
+ assert offsets[tp].timestamp == early_time
+
+ offsets = consumer.offsets_for_times({tp: 9999999999999})
+ assert offsets[tp] is None
+
+ # Beginning/End offsets
+
+ offsets = consumer.beginning_offsets([tp])
+ assert offsets == {tp: early_msg.offset}
+ offsets = consumer.end_offsets([tp])
+ assert offsets == {tp: late_msg.offset + 1}
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
+def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_producer, topic):
+ tp0 = TopicPartition(topic, 0)
+ tp1 = TopicPartition(topic, 1)
+
+ send_time = int(time.time() * 1000)
+ timeout = 10
+ p0msg = kafka_producer.send(
+ topic, partition=0, value=b"XXX",
+ timestamp_ms=send_time).get(timeout)
+ p1msg = kafka_producer.send(
+ topic, partition=1, value=b"XXX",
+ timestamp_ms=send_time).get(timeout)
+
+ consumer = kafka_consumer
+ offsets = consumer.offsets_for_times({
+ tp0: send_time,
+ tp1: send_time
+ })
+
+ assert offsets == {
+ tp0: OffsetAndTimestamp(p0msg.offset, send_time),
+ tp1: OffsetAndTimestamp(p1msg.offset, send_time)
+ }
+
+ offsets = consumer.beginning_offsets([tp0, tp1])
+ assert offsets == {
+ tp0: p0msg.offset,
+ tp1: p1msg.offset
+ }
+
+ offsets = consumer.end_offsets([tp0, tp1])
+ assert offsets == {
+ tp0: p0msg.offset + 1,
+ tp1: p1msg.offset + 1
+ }
+
+
+@pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1")
+def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic):
+ consumer = kafka_consumer
+ tp = TopicPartition(topic, 0)
+
+ with pytest.raises(UnsupportedVersionError):
+ consumer.offsets_for_times({tp: int(time.time())})
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
+def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic):
+ consumer = kafka_consumer_factory(fetch_max_wait_ms=200,
+ request_timeout_ms=500)
+ tp = TopicPartition(topic, 0)
+ bad_tp = TopicPartition(topic, 100)
+
+ with pytest.raises(ValueError):
+ consumer.offsets_for_times({tp: -1})
+
+ with pytest.raises(KafkaTimeoutError):
+ consumer.offsets_for_times({bad_tp: 0})
diff --git a/test/testutil.py b/test/testutil.py
index 3272262..650f9bf 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -47,6 +47,17 @@ def current_offset(client, topic, partition, kafka_broker=None):
return offsets.offsets[0]
+def assert_message_count(messages, num_messages):
+ """Check that we received the expected number of messages with no duplicates."""
+ # Make sure we got them all
+ assert len(messages) == num_messages
+ # Make sure there are no duplicates
+ # Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers,
+ # timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes.
+ unique_messages = {(m.key, m.value) for m in messages}
+ assert len(unique_messages) == num_messages
+
+
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None