summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Cavallaro <paulcavallaro@users.noreply.github.com>2016-05-10 11:34:06 -0400
committerDana Powers <dana.powers@gmail.com>2016-05-10 08:34:06 -0700
commit2c9930dea4a4537cf237ac7cc9db1f3970419b59 (patch)
tree74956b9a2433cc214d87bcd593c3eaead593eb8b
parent0684302d8fa1271ad0e913972b382b00ddeab717 (diff)
downloadkafka-python-2c9930dea4a4537cf237ac7cc9db1f3970419b59.tar.gz
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages* [Legacy Protocol] Update legacy protocol to handle compressed messages
* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages
-rw-r--r--kafka/protocol/legacy.py14
-rw-r--r--test/test_consumer_integration.py19
2 files changed, 30 insertions, 3 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 08d2d01..cd100d6 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -204,13 +204,23 @@ class KafkaProtocol(object):
return [
kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
- kafka.structs.OffsetAndMessage(offset, message)
- for offset, _, message in messages])
+ offset_and_msg
+ for offset_and_msg in cls.decode_message_set(messages)])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
]
@classmethod
+ def decode_message_set(cls, messages):
+ for offset, _, message in messages:
+ if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
+ inner_messages = message.decompress()
+ for (inner_offset, _msg_size, inner_msg) in inner_messages:
+ yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg)
+ else:
+ yield kafka.structs.OffsetAndMessage(offset, message)
+
+ @classmethod
def encode_offset_request(cls, payloads=()):
return kafka.protocol.offset.OffsetRequest[0](
replica_id=-1,
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 4e081ce..9c27eee 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -5,7 +5,7 @@ from six.moves import xrange
from . import unittest
from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
+ KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
@@ -49,6 +49,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
return [ x.value for x in messages ]
+ def send_gzip_message(self, partition, messages):
+ message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages])
+ produce = ProduceRequestPayload(self.topic, partition, messages = [message])
+ resp, = self.client.send_produce_request([produce])
+ self.assertEqual(resp.error, 0)
+
def assert_message_count(self, messages, num_messages):
# Make sure we got them all
self.assertEqual(len(messages), num_messages)
@@ -92,6 +98,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ def test_simple_consumer_gzip(self):
+ self.send_gzip_message(0, range(0, 100))
+ self.send_gzip_message(1, range(100, 200))
+
+ # Start a consumer
+ consumer = self.consumer()
+
+ self.assert_message_count([ message for message in consumer ], 200)
+
+ consumer.stop()
+
def test_simple_consumer_smallest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))