diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:18:08 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:23:17 -0800 |
commit | 50f6a25ceb0de0c1565092c40920429b9d42305e (patch) | |
tree | 6296f10f0bfefbedbed9c6dc483552de7c3aa5a4 | |
parent | 9acb68901529a0158e37753c931ff00ccfaaaa7a (diff) | |
download | kafka-python-50f6a25ceb0de0c1565092c40920429b9d42305e.tar.gz |
Update consumer integration tests to use new (group) KafkaConsumer
- Remove debug call to deprecated .offsets() method
- Manually assign TopicPartition to avoid group subscription overhead
- Use next(consumer), not consumer.next()
- consumer_timeout_ms now raises StopIteration, not ConsumerTimeout
- auto_commit_enable is now enable_auto_commit
- auto_offset_reset -> earliest, not smallest
- new consumer does not support auto_commit_interval_messages
-rw-r--r-- | test/test_consumer_integration.py | 60 |
1 files changed, 31 insertions, 29 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index cd5af5e..1104916 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -7,8 +7,8 @@ from kafka import ( KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message ) from kafka.common import ( - ProduceRequestPayload, ConsumerFetchSizeTooSmall, ConsumerTimeout, - OffsetOutOfRangeError + ProduceRequestPayload, ConsumerFetchSizeTooSmall, + OffsetOutOfRangeError, TopicPartition ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -475,11 +475,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(1, range(100, 200)) # Start a consumer - consumer = self.kafka_consumer(auto_offset_reset='smallest', + consumer = self.kafka_consumer(auto_offset_reset='earliest', consumer_timeout_ms=5000) n = 0 messages = {0: set(), 1: set()} - logging.debug("kafka consumer offsets: %s" % consumer.offsets()) for m in consumer: logging.debug("Consumed message %s" % repr(m)) n += 1 @@ -493,13 +492,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 - consumer = self.kafka_consumer(auto_offset_reset='smallest', + consumer = self.kafka_consumer(auto_offset_reset='earliest', consumer_timeout_ms=TIMEOUT_MS) + # Manual assignment avoids overhead of consumer group mgmt + consumer.unsubscribe() + consumer.assign([TopicPartition(self.topic, 0)]) + # Ask for 5 messages, nothing in queue, block 500ms with Timer() as t: - with self.assertRaises(ConsumerTimeout): - msg = consumer.next() + with self.assertRaises(StopIteration): + msg = next(consumer) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) self.send_messages(0, range(0, 10)) @@ -508,7 +511,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = set() with Timer() as t: for i in range(5): - msg = consumer.next() + msg = next(consumer) messages.add((msg.partition, msg.offset)) self.assertEqual(len(messages), 5) self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) @@ -516,9 +519,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Ask for 10 messages, get 5 back, block 500ms messages = set() with Timer() as t: - with self.assertRaises(ConsumerTimeout): + with self.assertRaises(StopIteration): for i in range(10): - msg = consumer.next() + msg = next(consumer) messages.add((msg.partition, msg.offset)) self.assertEqual(len(messages), 5) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) @@ -532,36 +535,35 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Start a consumer consumer1 = self.kafka_consumer( - group_id = GROUP_ID, - auto_commit_enable = True, - auto_commit_interval_ms = None, - auto_commit_interval_messages = 20, - auto_offset_reset='smallest', + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', + consumer_timeout_ms=100 ) - # Grab the first 195 messages + # Grab the first 180 messages output_msgs1 = [] - for _ in xrange(195): - m = consumer1.next() + for _ in xrange(180): + m = next(consumer1) output_msgs1.append(m) - consumer1.task_done(m) - self.assert_message_count(output_msgs1, 195) + self.assert_message_count(output_msgs1, 180) + consumer1.close() # The total offset across both partitions should be at 180 consumer2 = self.kafka_consumer( - group_id = GROUP_ID, - auto_commit_enable = True, - auto_commit_interval_ms = None, - auto_commit_interval_messages = 20, - consumer_timeout_ms = 100, - auto_offset_reset='smallest', + group_id=GROUP_ID, + enable_auto_commit=True, + auto_commit_interval_ms=100, + auto_offset_reset='earliest', + consumer_timeout_ms=100 ) # 181-200 output_msgs2 = [] - with self.assertRaises(ConsumerTimeout): + with self.assertRaises(StopIteration): while True: - m = consumer2.next() + m = next(consumer2) output_msgs2.append(m) self.assert_message_count(output_msgs2, 20) - self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15) + #self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15) |