diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-03-13 14:22:57 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-03-14 13:30:35 -0700 |
commit | fb023fe85d0bac4e088346765311794a574d13bf (patch) | |
tree | 2e45168596eafae6827bf88e1b33430787813c67 | |
parent | 3b899decb8b5159c1086a5211eda315c090c6d59 (diff) | |
download | kafka-python-fb023fe85d0bac4e088346765311794a574d13bf.tar.gz |
Prefer python-lz4 over lz4f if available
-rw-r--r-- | docs/index.rst | 5 | ||||
-rw-r--r-- | docs/install.rst | 6 | ||||
-rw-r--r-- | kafka/codec.py | 39 | ||||
-rw-r--r-- | tox.ini | 2 |
4 files changed, 37 insertions, 15 deletions
diff --git a/docs/index.rst b/docs/index.rst index 2cef7fe..21cb3b9 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -113,9 +113,8 @@ Compression *********** kafka-python supports gzip compression/decompression natively. To produce or -consume lz4 compressed messages, you must install lz4tools and xxhash (modules -may not work on python2.6). To enable snappy, install python-snappy (also -requires snappy library). +consume lz4 compressed messages, you should install python-lz4 (pip install lz4). +To enable snappy, install python-snappy (also requires snappy library). See `Installation <install.html#optional-snappy-install>`_ for more information. diff --git a/docs/install.rst b/docs/install.rst index 9720d65..cc0e82d 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -26,12 +26,10 @@ Bleeding-Edge Optional LZ4 install ******************** -To enable LZ4 compression/decompression, install lz4tools and xxhash: +To enable LZ4 compression/decompression, install python-lz4: ->>> pip install lz4tools ->>> pip install xxhash +>>> pip install lz4 -*Note*: these modules do not support python2.6 Optional Snappy install *********************** diff --git a/kafka/codec.py b/kafka/codec.py index 4deec49..29db48e 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -17,11 +17,20 @@ except ImportError: snappy = None try: + import lz4.frame as lz4 +except ImportError: + lz4 = None + +try: import lz4f - import xxhash except ImportError: lz4f = None +try: + import xxhash +except ImportError: + xxhash = None + PYPY = bool(platform.python_implementation() == 'PyPy') def has_gzip(): @@ -33,7 +42,11 @@ def has_snappy(): def has_lz4(): - return lz4f is not None + if lz4 is not None: + return True + if lz4f is not None: + return True + return False def gzip_encode(payload, compresslevel=None): @@ -181,13 +194,15 @@ def snappy_decode(payload): return snappy.decompress(payload) -def lz4_encode(payload): - """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" - # pylint: disable-msg=no-member - return lz4f.compressFrame(payload) +if lz4: + lz4_encode = lz4.compress # pylint: disable-msg=no-member +elif lz4f: + lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member +else: + lz4_encode = None -def lz4_decode(payload): +def lz4f_decode(payload): """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" # pylint: disable-msg=no-member ctx = lz4f.createDecompContext() @@ -201,8 +216,17 @@ def lz4_decode(payload): return data['decomp'] +if lz4: + lz4_decode = lz4.decompress # pylint: disable-msg=no-member +elif lz4f: + lz4_decode = lz4f_decode +else: + lz4_decode = None + + def lz4_encode_old_kafka(payload): """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum.""" + assert xxhash is not None data = lz4_encode(payload) header_size = 7 if isinstance(data[4], int): @@ -224,6 +248,7 @@ def lz4_encode_old_kafka(payload): def lz4_decode_old_kafka(payload): + assert xxhash is not None # Kafka's LZ4 code has a bug in its header checksum implementation header_size = 7 if isinstance(payload[4], int): @@ -17,7 +17,7 @@ deps = pytest-mock mock python-snappy - lz4tools + lz4 xxhash py26: unittest2 commands = |