diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:03 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:03 -0700 |
commit | 1d4251a9efa4c5466ba5095f3ba199bf082a72b5 (patch) | |
tree | 0393a7b7768f449174216fb6b0702a87844a5a81 | |
parent | 96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff) | |
download | kafka-python-1d4251a9efa4c5466ba5095f3ba199bf082a72b5.tar.gz |
Use standard LZ4 framing for v1 messages / kafka 0.10 (#695)
* LZ4 framing fixed in 0.10 / message v1 -- retain broken lz4 code for compatibility
* lz4f does not support easy incremental decompression - raise RuntimeError
* Update lz4 codec tests
-rw-r--r-- | kafka/codec.py | 30 | ||||
-rw-r--r-- | kafka/errors.py | 4 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 9 | ||||
-rw-r--r-- | kafka/protocol/message.py | 8 | ||||
-rw-r--r-- | test/test_codec.py | 23 |
5 files changed, 62 insertions, 12 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index e94bc4c..9c31e9d 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -180,8 +180,27 @@ def snappy_decode(payload): def lz4_encode(payload): - data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member - # Kafka's LZ4 code has a bug in its header checksum implementation + """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" + # pylint: disable-msg=no-member + return lz4f.compressFrame(payload) + + +def lz4_decode(payload): + """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" + # pylint: disable-msg=no-member + ctx = lz4f.createDecompContext() + data = lz4f.decompressFrame(payload, ctx) + + # lz4f python module does not expose how much of the payload was + # actually read if the decompression was only partial. + if data['next'] != 0: + raise RuntimeError('lz4f unable to decompress full payload') + return data['decomp'] + + +def lz4_encode_old_kafka(payload): + """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum.""" + data = lz4_encode(payload) header_size = 7 if isinstance(data[4], int): flg = data[4] @@ -201,7 +220,7 @@ def lz4_encode(payload): ]) -def lz4_decode(payload): +def lz4_decode_old_kafka(payload): # Kafka's LZ4 code has a bug in its header checksum implementation header_size = 7 if isinstance(payload[4], int): @@ -220,7 +239,4 @@ def lz4_decode(payload): hc, payload[header_size:] ]) - - cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member - data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member - return data['decomp'] + return lz4_decode(munged_payload) diff --git a/kafka/errors.py b/kafka/errors.py index a34ffef..6960810 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -81,8 +81,8 @@ class OffsetOutOfRangeError(BrokerResponseError): class InvalidMessageError(BrokerResponseError): errno = 2 message = 'INVALID_MESSAGE' - description = ('This indicates that a message contents does not match its' - ' CRC.') + description = ('This message has failed its CRC checksum, exceeds the' + ' valid size, or is otherwise corrupt.') class UnknownTopicOrPartitionError(BrokerResponseError): diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index ba9b5db..5dc2e1f 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -6,7 +6,8 @@ import threading import time from ..codec import (has_gzip, has_snappy, has_lz4, - gzip_encode, snappy_encode, lz4_encode) + gzip_encode, snappy_encode, + lz4_encode, lz4_encode_old_kafka) from .. import errors as Errors from ..protocol.types import Int32, Int64 from ..protocol.message import MessageSet, Message @@ -28,10 +29,16 @@ class MessageSetBuffer(object): 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP), 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), + 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4), } def __init__(self, buf, batch_size, compression_type=None, message_version=0): if compression_type is not None: assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' + + # Kafka 0.8/0.9 had a quirky lz4... + if compression_type == 'lz4' and message_version == 0: + compression_type = 'lz4-old-kafka' + checker, encoder, attributes = self._COMPRESSORS[compression_type] assert checker(), 'Compression Libraries Not Found' self._compressor = encoder diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 473ca56..78840fc 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -2,7 +2,8 @@ import io import time from ..codec import (has_gzip, has_snappy, has_lz4, - gzip_decode, snappy_decode, lz4_decode) + gzip_decode, snappy_decode, + lz4_decode, lz4_decode_old_kafka) from . import pickle from .struct import Struct from .types import ( @@ -116,7 +117,10 @@ class Message(Struct): raw_bytes = snappy_decode(self.value) elif codec == self.CODEC_LZ4: assert has_lz4(), 'LZ4 decompression unsupported' - raw_bytes = lz4_decode(self.value) + if self.magic == 0: + raw_bytes = lz4_decode_old_kafka(self.value) + else: + raw_bytes = lz4_decode(self.value) else: raise Exception('This should be impossible') diff --git a/test/test_codec.py b/test/test_codec.py index 07a74cd..906b53c 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -8,6 +8,7 @@ from kafka.codec import ( gzip_encode, gzip_decode, snappy_encode, snappy_decode, lz4_encode, lz4_decode, + lz4_encode_old_kafka, lz4_decode_old_kafka, ) from test.testutil import random_string @@ -84,4 +85,26 @@ def test_lz4(): for i in xrange(1000): b1 = random_string(100).encode('utf-8') b2 = lz4_decode(lz4_encode(b1)) + assert len(b1) == len(b2) + assert b1 == b2 + + +@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") +def test_lz4_old(): + for i in xrange(1000): + b1 = random_string(100).encode('utf-8') + b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1)) + assert len(b1) == len(b2) + assert b1 == b2 + + +@pytest.mark.xfail(reason="lz4tools library doesnt support incremental decompression") +@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") +def test_lz4_incremental(): + for i in xrange(1000): + # lz4 max single block size is 4MB + # make sure we test with multiple-blocks + b1 = random_string(100).encode('utf-8') * 50000 + b2 = lz4_decode(lz4_encode(b1)) + assert len(b1) == len(b2) assert b1 == b2 |