diff options
author | Paul Cavallaro <paulcavallaro@users.noreply.github.com> | 2016-05-10 11:34:06 -0400 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-10 08:34:06 -0700 |
commit | 2c9930dea4a4537cf237ac7cc9db1f3970419b59 (patch) | |
tree | 74956b9a2433cc214d87bcd593c3eaead593eb8b | |
parent | 0684302d8fa1271ad0e913972b382b00ddeab717 (diff) | |
download | kafka-python-2c9930dea4a4537cf237ac7cc9db1f3970419b59.tar.gz |
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages* [Legacy Protocol] Update legacy protocol to handle compressed messages
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages
-rw-r--r-- | kafka/protocol/legacy.py | 14 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 19 |
2 files changed, 30 insertions, 3 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 08d2d01..cd100d6 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -204,13 +204,23 @@ class KafkaProtocol(object): return [ kafka.structs.FetchResponsePayload( topic, partition, error, highwater_offset, [ - kafka.structs.OffsetAndMessage(offset, message) - for offset, _, message in messages]) + offset_and_msg + for offset_and_msg in cls.decode_message_set(messages)]) for topic, partitions in response.topics for partition, error, highwater_offset, messages in partitions ] @classmethod + def decode_message_set(cls, messages): + for offset, _, message in messages: + if isinstance(message, kafka.protocol.message.Message) and message.is_compressed(): + inner_messages = message.decompress() + for (inner_offset, _msg_size, inner_msg) in inner_messages: + yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg) + else: + yield kafka.structs.OffsetAndMessage(offset, message) + + @classmethod def encode_offset_request(cls, payloads=()): return kafka.protocol.offset.OffsetRequest[0]( replica_id=-1, diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 4e081ce..9c27eee 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -5,7 +5,7 @@ from six.moves import xrange from . import unittest from kafka import ( - KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message + KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError @@ -49,6 +49,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): return [ x.value for x in messages ] + def send_gzip_message(self, partition, messages): + message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages]) + produce = ProduceRequestPayload(self.topic, partition, messages = [message]) + resp, = self.client.send_produce_request([produce]) + self.assertEqual(resp.error, 0) + def assert_message_count(self, messages, num_messages): # Make sure we got them all self.assertEqual(len(messages), num_messages) @@ -92,6 +98,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + def test_simple_consumer_gzip(self): + self.send_gzip_message(0, range(0, 100)) + self.send_gzip_message(1, range(100, 200)) + + # Start a consumer + consumer = self.consumer() + + self.assert_message_count([ message for message in consumer ], 200) + + consumer.stop() + def test_simple_consumer_smallest_offset_reset(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) |