summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-24 22:01:09 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 10:09:15 -0800
commit0d5899020a75e22fec14d3e3d7aec8f043d60a31 (patch)
treee227d12865560026a3170196b331d4d843a9e7fd
parent2c7b7452a8ca761672e70ee56b3779e4a96c1997 (diff)
downloadkafka-python-0d5899020a75e22fec14d3e3d7aec8f043d60a31.tar.gz
Add support for LZ4 compressed messages using python-lz4 module
-rw-r--r--README.rst8
-rw-r--r--docs/install.rst11
-rw-r--r--kafka/codec.py13
-rw-r--r--kafka/producer/buffer.py5
-rw-r--r--kafka/producer/kafka.py2
-rw-r--r--kafka/producer/record_accumulator.py2
-rw-r--r--kafka/protocol/message.py15
7 files changed, 49 insertions, 7 deletions
diff --git a/README.rst b/README.rst
index 1d04e0b..2bcc150 100644
--- a/README.rst
+++ b/README.rst
@@ -79,6 +79,14 @@ for more details.
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
+Compression
+***********
+
+kafka-python supports gzip compression/decompression natively. To produce or
+consume snappy and lz4 compressed messages, you must install `lz4` (`lz4-cffi`
+if using pypy) and/or `python-snappy` (also requires snappy library).
+See `Installation <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install>`_
+for more information.
Protocol
********
diff --git a/docs/install.rst b/docs/install.rst
index bf49c3f..aba5019 100644
--- a/docs/install.rst
+++ b/docs/install.rst
@@ -37,6 +37,17 @@ Using `setup.py` directly:
cd kafka-python
python setup.py install
+Optional LZ4 install
+********************
+
+To enable LZ4 compression/decompression, install `lz4`:
+
+>>> pip install lz4
+
+Or `lz4-cffi` if using pypy:
+
+>>> pip install lz4-cffi
+
Optional Snappy install
***********************
diff --git a/kafka/codec.py b/kafka/codec.py
index c27d89b..c8195ee 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -13,6 +13,15 @@ try:
except ImportError:
_HAS_SNAPPY = False
+try:
+ import lz4
+ from lz4 import compress as lz4_encode
+ from lz4 import decompress as lz4_decode
+except ImportError:
+ lz4 = None
+ lz4_encode = None
+ lz4_decode = None
+
def has_gzip():
return True
@@ -22,6 +31,10 @@ def has_snappy():
return _HAS_SNAPPY
+def has_lz4():
+ return lz4 is not None
+
+
def gzip_encode(payload, compresslevel=None):
if not compresslevel:
compresslevel = 9
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 4e05ec9..1a2dd71 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -5,8 +5,8 @@ import io
import threading
import time
-from ..codec import (has_gzip, has_snappy,
- gzip_encode, snappy_encode)
+from ..codec import (has_gzip, has_snappy, has_lz4,
+ gzip_encode, snappy_encode, lz4_encode)
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
@@ -27,6 +27,7 @@ class MessageSetBuffer(object):
_COMPRESSORS = {
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
+ 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None):
assert batch_size > 0, 'batch_size must be > 0'
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 220528f..2443265 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -111,7 +111,7 @@ class KafkaProducer(object):
remains alive. This is the strongest available guarantee.
If unset, defaults to acks=1.
compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', or None.
+ the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 17cfa5e..6a762eb 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -114,7 +114,7 @@ class RecordAccumulator(object):
In the current implementation, this setting is an approximation.
Default: 33554432 (32MB)
compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', or None.
+ the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index fb54049..ae261bf 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,6 +1,7 @@
import io
-from ..codec import gzip_decode, snappy_decode
+from ..codec import (has_gzip, has_snappy, has_lz4,
+ gzip_decode, snappy_decode, lz4_decode)
from . import pickle
from .struct import Struct
from .types import (
@@ -20,6 +21,7 @@ class Message(Struct):
CODEC_MASK = 0x03
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
+ CODEC_LZ4 = 0x03
HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
@@ -61,11 +63,18 @@ class Message(Struct):
def decompress(self):
codec = self.attributes & self.CODEC_MASK
- assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY)
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
if codec == self.CODEC_GZIP:
+ assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
- else:
+ elif codec == self.CODEC_SNAPPY:
+ assert has_snappy(), 'Snappy decompression unsupported'
raw_bytes = snappy_decode(self.value)
+ elif codec == self.CODEC_LZ4:
+ assert has_lz4(), 'LZ4 decompression unsupported'
+ raw_bytes = lz4_decode(self.value)
+ else:
+ raise Exception('This should be impossible')
return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))