summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 17:14:03 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 17:14:03 -0700
commit1d4251a9efa4c5466ba5095f3ba199bf082a72b5 (patch)
tree0393a7b7768f449174216fb6b0702a87844a5a81
parent96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff)
downloadkafka-python-1d4251a9efa4c5466ba5095f3ba199bf082a72b5.tar.gz
Use standard LZ4 framing for v1 messages / kafka 0.10 (#695)
* LZ4 framing fixed in 0.10 / message v1 -- retain broken lz4 code for compatibility * lz4f does not support easy incremental decompression - raise RuntimeError * Update lz4 codec tests
-rw-r--r--kafka/codec.py30
-rw-r--r--kafka/errors.py4
-rw-r--r--kafka/producer/buffer.py9
-rw-r--r--kafka/protocol/message.py8
-rw-r--r--test/test_codec.py23
5 files changed, 62 insertions, 12 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index e94bc4c..9c31e9d 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -180,8 +180,27 @@ def snappy_decode(payload):
def lz4_encode(payload):
- data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member
- # Kafka's LZ4 code has a bug in its header checksum implementation
+ """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
+ # pylint: disable-msg=no-member
+ return lz4f.compressFrame(payload)
+
+
+def lz4_decode(payload):
+ """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
+ # pylint: disable-msg=no-member
+ ctx = lz4f.createDecompContext()
+ data = lz4f.decompressFrame(payload, ctx)
+
+ # lz4f python module does not expose how much of the payload was
+ # actually read if the decompression was only partial.
+ if data['next'] != 0:
+ raise RuntimeError('lz4f unable to decompress full payload')
+ return data['decomp']
+
+
+def lz4_encode_old_kafka(payload):
+ """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
+ data = lz4_encode(payload)
header_size = 7
if isinstance(data[4], int):
flg = data[4]
@@ -201,7 +220,7 @@ def lz4_encode(payload):
])
-def lz4_decode(payload):
+def lz4_decode_old_kafka(payload):
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
@@ -220,7 +239,4 @@ def lz4_decode(payload):
hc,
payload[header_size:]
])
-
- cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member
- data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member
- return data['decomp']
+ return lz4_decode(munged_payload)
diff --git a/kafka/errors.py b/kafka/errors.py
index a34ffef..6960810 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -81,8 +81,8 @@ class OffsetOutOfRangeError(BrokerResponseError):
class InvalidMessageError(BrokerResponseError):
errno = 2
message = 'INVALID_MESSAGE'
- description = ('This indicates that a message contents does not match its'
- ' CRC.')
+ description = ('This message has failed its CRC checksum, exceeds the'
+ ' valid size, or is otherwise corrupt.')
class UnknownTopicOrPartitionError(BrokerResponseError):
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index ba9b5db..5dc2e1f 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -6,7 +6,8 @@ import threading
import time
from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_encode, snappy_encode, lz4_encode)
+ gzip_encode, snappy_encode,
+ lz4_encode, lz4_encode_old_kafka)
from .. import errors as Errors
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
@@ -28,10 +29,16 @@ class MessageSetBuffer(object):
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
+ 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
+
+ # Kafka 0.8/0.9 had a quirky lz4...
+ if compression_type == 'lz4' and message_version == 0:
+ compression_type = 'lz4-old-kafka'
+
checker, encoder, attributes = self._COMPRESSORS[compression_type]
assert checker(), 'Compression Libraries Not Found'
self._compressor = encoder
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 473ca56..78840fc 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -2,7 +2,8 @@ import io
import time
from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_decode, snappy_decode, lz4_decode)
+ gzip_decode, snappy_decode,
+ lz4_decode, lz4_decode_old_kafka)
from . import pickle
from .struct import Struct
from .types import (
@@ -116,7 +117,10 @@ class Message(Struct):
raw_bytes = snappy_decode(self.value)
elif codec == self.CODEC_LZ4:
assert has_lz4(), 'LZ4 decompression unsupported'
- raw_bytes = lz4_decode(self.value)
+ if self.magic == 0:
+ raw_bytes = lz4_decode_old_kafka(self.value)
+ else:
+ raw_bytes = lz4_decode(self.value)
else:
raise Exception('This should be impossible')
diff --git a/test/test_codec.py b/test/test_codec.py
index 07a74cd..906b53c 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -8,6 +8,7 @@ from kafka.codec import (
gzip_encode, gzip_decode,
snappy_encode, snappy_decode,
lz4_encode, lz4_decode,
+ lz4_encode_old_kafka, lz4_decode_old_kafka,
)
from test.testutil import random_string
@@ -84,4 +85,26 @@ def test_lz4():
for i in xrange(1000):
b1 = random_string(100).encode('utf-8')
b2 = lz4_decode(lz4_encode(b1))
+ assert len(b1) == len(b2)
+ assert b1 == b2
+
+
+@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
+def test_lz4_old():
+ for i in xrange(1000):
+ b1 = random_string(100).encode('utf-8')
+ b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1))
+ assert len(b1) == len(b2)
+ assert b1 == b2
+
+
+@pytest.mark.xfail(reason="lz4tools library doesnt support incremental decompression")
+@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
+def test_lz4_incremental():
+ for i in xrange(1000):
+ # lz4 max single block size is 4MB
+ # make sure we test with multiple-blocks
+ b1 = random_string(100).encode('utf-8') * 50000
+ b2 = lz4_decode(lz4_encode(b1))
+ assert len(b1) == len(b2)
assert b1 == b2