summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-03 16:18:08 -0800
committerDana Powers <dana.powers@rd.io>2016-01-03 16:23:17 -0800
commit50f6a25ceb0de0c1565092c40920429b9d42305e (patch)
tree6296f10f0bfefbedbed9c6dc483552de7c3aa5a4 /test/test_consumer_integration.py
parent9acb68901529a0158e37753c931ff00ccfaaaa7a (diff)
downloadkafka-python-50f6a25ceb0de0c1565092c40920429b9d42305e.tar.gz
Update consumer integration tests to use new (group) KafkaConsumer
- Remove debug call to deprecated .offsets() method - Manually assign TopicPartition to avoid group subscription overhead - Use next(consumer), not consumer.next() - consumer_timeout_ms now raises StopIteration, not ConsumerTimeout - auto_commit_enable is now enable_auto_commit - auto_offset_reset -> earliest, not smallest - new consumer does not support auto_commit_interval_messages
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py60
1 files changed, 31 insertions, 29 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index cd5af5e..1104916 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -7,8 +7,8 @@ from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
)
from kafka.common import (
- ProduceRequestPayload, ConsumerFetchSizeTooSmall, ConsumerTimeout,
- OffsetOutOfRangeError
+ ProduceRequestPayload, ConsumerFetchSizeTooSmall,
+ OffsetOutOfRangeError, TopicPartition
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -475,11 +475,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(100, 200))
# Start a consumer
- consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer = self.kafka_consumer(auto_offset_reset='earliest',
consumer_timeout_ms=5000)
n = 0
messages = {0: set(), 1: set()}
- logging.debug("kafka consumer offsets: %s" % consumer.offsets())
for m in consumer:
logging.debug("Consumed message %s" % repr(m))
n += 1
@@ -493,13 +492,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
- consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer = self.kafka_consumer(auto_offset_reset='earliest',
consumer_timeout_ms=TIMEOUT_MS)
+ # Manual assignment avoids overhead of consumer group mgmt
+ consumer.unsubscribe()
+ consumer.assign([TopicPartition(self.topic, 0)])
+
# Ask for 5 messages, nothing in queue, block 500ms
with Timer() as t:
- with self.assertRaises(ConsumerTimeout):
- msg = consumer.next()
+ with self.assertRaises(StopIteration):
+ msg = next(consumer)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
self.send_messages(0, range(0, 10))
@@ -508,7 +511,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = set()
with Timer() as t:
for i in range(5):
- msg = consumer.next()
+ msg = next(consumer)
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
@@ -516,9 +519,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Ask for 10 messages, get 5 back, block 500ms
messages = set()
with Timer() as t:
- with self.assertRaises(ConsumerTimeout):
+ with self.assertRaises(StopIteration):
for i in range(10):
- msg = consumer.next()
+ msg = next(consumer)
messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
@@ -532,36 +535,35 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Start a consumer
consumer1 = self.kafka_consumer(
- group_id = GROUP_ID,
- auto_commit_enable = True,
- auto_commit_interval_ms = None,
- auto_commit_interval_messages = 20,
- auto_offset_reset='smallest',
+ group_id=GROUP_ID,
+ enable_auto_commit=True,
+ auto_commit_interval_ms=100,
+ auto_offset_reset='earliest',
+ consumer_timeout_ms=100
)
- # Grab the first 195 messages
+ # Grab the first 180 messages
output_msgs1 = []
- for _ in xrange(195):
- m = consumer1.next()
+ for _ in xrange(180):
+ m = next(consumer1)
output_msgs1.append(m)
- consumer1.task_done(m)
- self.assert_message_count(output_msgs1, 195)
+ self.assert_message_count(output_msgs1, 180)
+ consumer1.close()
# The total offset across both partitions should be at 180
consumer2 = self.kafka_consumer(
- group_id = GROUP_ID,
- auto_commit_enable = True,
- auto_commit_interval_ms = None,
- auto_commit_interval_messages = 20,
- consumer_timeout_ms = 100,
- auto_offset_reset='smallest',
+ group_id=GROUP_ID,
+ enable_auto_commit=True,
+ auto_commit_interval_ms=100,
+ auto_offset_reset='earliest',
+ consumer_timeout_ms=100
)
# 181-200
output_msgs2 = []
- with self.assertRaises(ConsumerTimeout):
+ with self.assertRaises(StopIteration):
while True:
- m = consumer2.next()
+ m = next(consumer2)
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
- self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)
+ #self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)