summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-03-07 00:59:26 +0200
committerDana Powers <dana.powers@gmail.com>2017-03-06 14:59:26 -0800
commit9c19ea7cbe163b0c434ce9dd9c8c42471027cce5 (patch)
tree8802dfc07e053279d18be591af11a1a4edf4988c /test/test_consumer_integration.py
parentff6f7bf085b912090b436da1c99f6f8f4cf66f94 (diff)
downloadkafka-python-9c19ea7cbe163b0c434ce9dd9c8c42471027cce5.tar.gz
Added `max_bytes` option and FetchRequest_v3 usage. (#962)
* Added `max_bytes` option and FetchRequest_v3 usage. * Add checks for versions above 0.10 based on ApiVersionResponse
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py46
1 files changed, 46 insertions, 0 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 998045f..9473691 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -2,6 +2,7 @@ import logging
import os
from six.moves import xrange
+import six
from . import unittest
from kafka import (
@@ -572,3 +573,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_max_bytes_simple(self):
+ self.send_messages(0, range(100, 200))
+ self.send_messages(1, range(200, 300))
+
+ # Start a consumer
+ consumer = self.kafka_consumer(
+ auto_offset_reset='earliest', fetch_max_bytes=300)
+ fetched_size = 0
+ seen_partitions = set([])
+ for i in range(10):
+ poll_res = consumer.poll(timeout_ms=100)
+ for partition, msgs in six.iteritems(poll_res):
+ for msg in msgs:
+ fetched_size += len(msg.value)
+ seen_partitions.add(partition)
+
+ # Check that we fetched at least 1 message from both partitions
+ self.assertEqual(
+ seen_partitions, set([
+ TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
+ self.assertLess(fetched_size, 3000)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_max_bytes_one_msg(self):
+ # We send to only 1 partition so we don't have parallel requests to 2
+ # nodes for data.
+ self.send_messages(0, range(100, 200))
+
+ # Start a consumer. FetchResponse_v3 should always include at least 1
+ # full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time
+ consumer = self.kafka_consumer(
+ auto_offset_reset='earliest', fetch_max_bytes=1)
+ fetched_msgs = []
+ # A bit hacky, but we need this in order for message count to be exact
+ consumer._coordinator.ensure_active_group()
+ for i in range(10):
+ poll_res = consumer.poll(timeout_ms=2000)
+ print(poll_res)
+ for partition, msgs in six.iteritems(poll_res):
+ for msg in msgs:
+ fetched_msgs.append(msg)
+
+ self.assertEqual(len(fetched_msgs), 10)