diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-03-07 00:59:26 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-03-06 14:59:26 -0800 |
commit | 9c19ea7cbe163b0c434ce9dd9c8c42471027cce5 (patch) | |
tree | 8802dfc07e053279d18be591af11a1a4edf4988c /test/test_consumer_integration.py | |
parent | ff6f7bf085b912090b436da1c99f6f8f4cf66f94 (diff) | |
download | kafka-python-9c19ea7cbe163b0c434ce9dd9c8c42471027cce5.tar.gz |
Added `max_bytes` option and FetchRequest_v3 usage. (#962)
* Added `max_bytes` option and FetchRequest_v3 usage.
* Add checks for versions above 0.10 based on ApiVersionResponse
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 998045f..9473691 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -2,6 +2,7 @@ import logging import os from six.moves import xrange +import six from . import unittest from kafka import ( @@ -572,3 +573,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): output_msgs2.append(m) self.assert_message_count(output_msgs2, 20) self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_max_bytes_simple(self): + self.send_messages(0, range(100, 200)) + self.send_messages(1, range(200, 300)) + + # Start a consumer + consumer = self.kafka_consumer( + auto_offset_reset='earliest', fetch_max_bytes=300) + fetched_size = 0 + seen_partitions = set([]) + for i in range(10): + poll_res = consumer.poll(timeout_ms=100) + for partition, msgs in six.iteritems(poll_res): + for msg in msgs: + fetched_size += len(msg.value) + seen_partitions.add(partition) + + # Check that we fetched at least 1 message from both partitions + self.assertEqual( + seen_partitions, set([ + TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)])) + self.assertLess(fetched_size, 3000) + + @kafka_versions('>=0.10.1') + def test_kafka_consumer_max_bytes_one_msg(self): + # We send to only 1 partition so we don't have parallel requests to 2 + # nodes for data. + self.send_messages(0, range(100, 200)) + + # Start a consumer. FetchResponse_v3 should always include at least 1 + # full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time + consumer = self.kafka_consumer( + auto_offset_reset='earliest', fetch_max_bytes=1) + fetched_msgs = [] + # A bit hacky, but we need this in order for message count to be exact + consumer._coordinator.ensure_active_group() + for i in range(10): + poll_res = consumer.poll(timeout_ms=2000) + print(poll_res) + for partition, msgs in six.iteritems(poll_res): + for msg in msgs: + fetched_msgs.append(msg) + + self.assertEqual(len(fetched_msgs), 10) |