summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-27 10:54:57 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-27 10:54:57 -0800
commit0914b81248de612ae7007d284419c40ccb8b1065 (patch)
tree6698c6b28681d35edbfe5ca9ff385c8664f33ee1
parent54a735bed082feecd68f15f63453e7f6ca58d547 (diff)
parentf08ec792ee93fd059e81ee1e30f5651c15f69e85 (diff)
downloadkafka-python-0914b81248de612ae7007d284419c40ccb8b1065.tar.gz
Merge pull request #522 from dpkp/lz4_fixup
Handle broken LZ4 framing
-rw-r--r--README.rst5
-rw-r--r--docs/index.rst5
-rw-r--r--docs/install.rst10
-rw-r--r--kafka/codec.py58
-rw-r--r--test/test_producer.py15
-rw-r--r--tox.ini4
6 files changed, 77 insertions, 20 deletions
diff --git a/README.rst b/README.rst
index 61b737f..782aba0 100644
--- a/README.rst
+++ b/README.rst
@@ -102,8 +102,9 @@ Compression
***********
kafka-python supports gzip compression/decompression natively. To produce or
-consume snappy and lz4 compressed messages, you must install `lz4` (`lz4-cffi`
-if using pypy) and/or `python-snappy` (also requires snappy library).
+consume lz4 compressed messages, you must install lz4tools and xxhash (modules
+may not work on python2.6). To enable snappy compression/decompression install
+python-snappy (also requires snappy library).
See `Installation <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install>`_
for more information.
diff --git a/docs/index.rst b/docs/index.rst
index 2f54b09..fd13a46 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -101,8 +101,9 @@ Compression
***********
kafka-python supports gzip compression/decompression natively. To produce or
-consume snappy and lz4 compressed messages, you must install lz4 (lz4-cffi
-if using pypy) and/or python-snappy (also requires snappy library).
+consume lz4 compressed messages, you must install lz4tools and xxhash (modules
+may not work on python2.6). To enable snappy, install python-snappy (also
+requires snappy library).
See `Installation <install.html#optional-snappy-install>`_ for more information.
diff --git a/docs/install.rst b/docs/install.rst
index aba5019..4dca5d0 100644
--- a/docs/install.rst
+++ b/docs/install.rst
@@ -40,14 +40,12 @@ Using `setup.py` directly:
Optional LZ4 install
********************
-To enable LZ4 compression/decompression, install `lz4`:
+To enable LZ4 compression/decompression, install lz4tools and xxhash:
->>> pip install lz4
-
-Or `lz4-cffi` if using pypy:
-
->>> pip install lz4-cffi
+>>> pip install lz4tools
+>>> pip install xxhash
+*Note*: these modules do not support python2.6
Optional Snappy install
***********************
diff --git a/kafka/codec.py b/kafka/codec.py
index 11d5a99..e94bc4c 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -15,13 +15,10 @@ except ImportError:
snappy = None
try:
- import lz4
- from lz4 import compress as lz4_encode
- from lz4 import decompress as lz4_decode
+ import lz4f
+ import xxhash
except ImportError:
- lz4 = None
- lz4_encode = None
- lz4_decode = None
+ lz4f = None
PYPY = bool(platform.python_implementation() == 'PyPy')
@@ -34,7 +31,7 @@ def has_snappy():
def has_lz4():
- return lz4 is not None
+ return lz4f is not None
def gzip_encode(payload, compresslevel=None):
@@ -180,3 +177,50 @@ def snappy_decode(payload):
return out.read()
else:
return snappy.decompress(payload)
+
+
+def lz4_encode(payload):
+ data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member
+ # Kafka's LZ4 code has a bug in its header checksum implementation
+ header_size = 7
+ if isinstance(data[4], int):
+ flg = data[4]
+ else:
+ flg = ord(data[4])
+ content_size_bit = ((flg >> 3) & 1)
+ if content_size_bit:
+ header_size += 8
+
+ # This is the incorrect hc
+ hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
+
+ return b''.join([
+ data[0:header_size-1],
+ hc,
+ data[header_size:]
+ ])
+
+
+def lz4_decode(payload):
+ # Kafka's LZ4 code has a bug in its header checksum implementation
+ header_size = 7
+ if isinstance(payload[4], int):
+ flg = payload[4]
+ else:
+ flg = ord(payload[4])
+ content_size_bit = ((flg >> 3) & 1)
+ if content_size_bit:
+ header_size += 8
+
+ # This should be the correct hc
+ hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
+
+ munged_payload = b''.join([
+ payload[0:header_size-1],
+ hc,
+ payload[header_size:]
+ ])
+
+ cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member
+ data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member
+ return data['decomp']
diff --git a/test/test_producer.py b/test/test_producer.py
index 263df11..36da68d 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,3 +1,5 @@
+import sys
+
import pytest
from kafka import KafkaConsumer, KafkaProducer
@@ -6,10 +8,21 @@ from test.testutil import random_string
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_end_to_end(kafka_broker):
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+def test_end_to_end(kafka_broker, compression):
+
+ if compression == 'lz4':
+ # LZ4 requires 0.8.2
+ if version() < (0, 8, 2):
+ return
+ # LZ4 python libs dont work on python2.6
+ elif sys.version_info < (2, 7):
+ return
+
connect_str = 'localhost:' + str(kafka_broker.port)
producer = KafkaProducer(bootstrap_servers=connect_str,
max_block_ms=10000,
+ compression_type=compression,
value_serializer=str.encode)
consumer = KafkaConsumer(bootstrap_servers=connect_str,
group_id=None,
diff --git a/tox.ini b/tox.ini
index 4ead9e3..ce7feee 100644
--- a/tox.ini
+++ b/tox.ini
@@ -16,8 +16,8 @@ deps =
pytest-mock
mock
python-snappy
- py{26,27,33,34,35}: lz4
- pypy: lz4-cffi
+ lz4tools
+ xxhash
py{26,27}: six
py26: unittest2
commands =