summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorPaul Cavallaro <paulcavallaro@users.noreply.github.com>2016-05-10 11:34:06 -0400
committerDana Powers <dana.powers@gmail.com>2016-05-10 08:34:06 -0700
commit2c9930dea4a4537cf237ac7cc9db1f3970419b59 (patch)
tree74956b9a2433cc214d87bcd593c3eaead593eb8b /test
parent0684302d8fa1271ad0e913972b382b00ddeab717 (diff)
downloadkafka-python-2c9930dea4a4537cf237ac7cc9db1f3970419b59.tar.gz
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages* [Legacy Protocol] Update legacy protocol to handle compressed messages
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages
Diffstat (limited to 'test')
-rw-r--r--test/test_consumer_integration.py19
1 files changed, 18 insertions, 1 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 4e081ce..9c27eee 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -5,7 +5,7 @@ from six.moves import xrange
from . import unittest
from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
+ KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
@@ -49,6 +49,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
return [ x.value for x in messages ]
+ def send_gzip_message(self, partition, messages):
+ message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages])
+ produce = ProduceRequestPayload(self.topic, partition, messages = [message])
+ resp, = self.client.send_produce_request([produce])
+ self.assertEqual(resp.error, 0)
+
def assert_message_count(self, messages, num_messages):
# Make sure we got them all
self.assertEqual(len(messages), num_messages)
@@ -92,6 +98,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ def test_simple_consumer_gzip(self):
+ self.send_gzip_message(0, range(0, 100))
+ self.send_gzip_message(1, range(100, 200))
+
+ # Start a consumer
+ consumer = self.consumer()
+
+ self.assert_message_count([ message for message in consumer ], 200)
+
+ consumer.stop()
+
def test_simple_consumer_smallest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))