summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-03-13 14:22:57 -0700
committerDana Powers <dana.powers@gmail.com>2017-03-14 13:30:35 -0700
commitfb023fe85d0bac4e088346765311794a574d13bf (patch)
tree2e45168596eafae6827bf88e1b33430787813c67
parent3b899decb8b5159c1086a5211eda315c090c6d59 (diff)
downloadkafka-python-fb023fe85d0bac4e088346765311794a574d13bf.tar.gz
Prefer python-lz4 over lz4f if available
-rw-r--r--docs/index.rst5
-rw-r--r--docs/install.rst6
-rw-r--r--kafka/codec.py39
-rw-r--r--tox.ini2
4 files changed, 37 insertions, 15 deletions
diff --git a/docs/index.rst b/docs/index.rst
index 2cef7fe..21cb3b9 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -113,9 +113,8 @@ Compression
***********
kafka-python supports gzip compression/decompression natively. To produce or
-consume lz4 compressed messages, you must install lz4tools and xxhash (modules
-may not work on python2.6). To enable snappy, install python-snappy (also
-requires snappy library).
+consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
+To enable snappy, install python-snappy (also requires snappy library).
See `Installation <install.html#optional-snappy-install>`_ for more information.
diff --git a/docs/install.rst b/docs/install.rst
index 9720d65..cc0e82d 100644
--- a/docs/install.rst
+++ b/docs/install.rst
@@ -26,12 +26,10 @@ Bleeding-Edge
Optional LZ4 install
********************
-To enable LZ4 compression/decompression, install lz4tools and xxhash:
+To enable LZ4 compression/decompression, install python-lz4:
->>> pip install lz4tools
->>> pip install xxhash
+>>> pip install lz4
-*Note*: these modules do not support python2.6
Optional Snappy install
***********************
diff --git a/kafka/codec.py b/kafka/codec.py
index 4deec49..29db48e 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -17,11 +17,20 @@ except ImportError:
snappy = None
try:
+ import lz4.frame as lz4
+except ImportError:
+ lz4 = None
+
+try:
import lz4f
- import xxhash
except ImportError:
lz4f = None
+try:
+ import xxhash
+except ImportError:
+ xxhash = None
+
PYPY = bool(platform.python_implementation() == 'PyPy')
def has_gzip():
@@ -33,7 +42,11 @@ def has_snappy():
def has_lz4():
- return lz4f is not None
+ if lz4 is not None:
+ return True
+ if lz4f is not None:
+ return True
+ return False
def gzip_encode(payload, compresslevel=None):
@@ -181,13 +194,15 @@ def snappy_decode(payload):
return snappy.decompress(payload)
-def lz4_encode(payload):
- """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
- # pylint: disable-msg=no-member
- return lz4f.compressFrame(payload)
+if lz4:
+ lz4_encode = lz4.compress # pylint: disable-msg=no-member
+elif lz4f:
+ lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
+else:
+ lz4_encode = None
-def lz4_decode(payload):
+def lz4f_decode(payload):
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
ctx = lz4f.createDecompContext()
@@ -201,8 +216,17 @@ def lz4_decode(payload):
return data['decomp']
+if lz4:
+ lz4_decode = lz4.decompress # pylint: disable-msg=no-member
+elif lz4f:
+ lz4_decode = lz4f_decode
+else:
+ lz4_decode = None
+
+
def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
+ assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
if isinstance(data[4], int):
@@ -224,6 +248,7 @@ def lz4_encode_old_kafka(payload):
def lz4_decode_old_kafka(payload):
+ assert xxhash is not None
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
diff --git a/tox.ini b/tox.ini
index 23ca385..03a6893 100644
--- a/tox.ini
+++ b/tox.ini
@@ -17,7 +17,7 @@ deps =
pytest-mock
mock
python-snappy
- lz4tools
+ lz4
xxhash
py26: unittest2
commands =