summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTincu Gabriel <gabi@aiven.io>2020-09-08 01:11:18 +0200
committerGitHub <noreply@github.com>2020-09-07 16:11:18 -0700
commita27ab881726ed1a2d952867a1fa266573165d6aa (patch)
tree918d904583960e760dfaeaf89d813e789782cdb2
parent08ea21167e3d6e9577d16715eadc9829bd8c1a80 (diff)
downloadkafka-python-a27ab881726ed1a2d952867a1fa266573165d6aa.tar.gz
Add support for `zstd` compression (#2021)
-rw-r--r--.travis.yml1
-rw-r--r--docs/index.rst9
-rw-r--r--kafka/codec.py25
-rw-r--r--kafka/producer/kafka.py8
-rw-r--r--kafka/protocol/message.py10
-rw-r--r--kafka/record/default_records.py11
-rw-r--r--kafka/record/memory_records.py2
-rw-r--r--test/test_codec.py11
-rw-r--r--test/test_producer.py20
-rw-r--r--tox.ini1
10 files changed, 75 insertions, 23 deletions
diff --git a/.travis.yml b/.travis.yml
index d660271..e837924 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,6 +22,7 @@ addons:
apt:
packages:
- libsnappy-dev
+ - libzstd-dev
- openjdk-8-jdk
cache:
diff --git a/docs/index.rst b/docs/index.rst
index fa6f93c..9c46e33 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -122,11 +122,12 @@ multiprocessing is recommended.
Compression
***********
-kafka-python supports gzip compression/decompression natively. To produce or
-consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
-To enable snappy, install python-snappy (also requires snappy library).
-See `Installation <install.html#optional-snappy-install>`_ for more information.
+kafka-python supports multiple compression types:
+ - gzip : supported natively
+ - lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
+ - snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)
+ - zstd : requires the `python-zstandard <https://github.com/indygreg/python-zstandard>`_ package installed
Protocol
********
diff --git a/kafka/codec.py b/kafka/codec.py
index aa9fc82..917400e 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -10,6 +10,7 @@ from kafka.vendor.six.moves import range
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
+ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024
try:
import snappy
@@ -17,6 +18,11 @@ except ImportError:
snappy = None
try:
+ import zstandard as zstd
+except ImportError:
+ zstd = None
+
+try:
import lz4.frame as lz4
def _lz4_compress(payload, **kwargs):
@@ -58,6 +64,10 @@ def has_snappy():
return snappy is not None
+def has_zstd():
+ return zstd is not None
+
+
def has_lz4():
if lz4 is not None:
return True
@@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload):
payload[header_size:]
])
return lz4_decode(munged_payload)
+
+
+def zstd_encode(payload):
+ if not zstd:
+ raise NotImplementedError("Zstd codec is not available")
+ return zstd.ZstdCompressor().compress(payload)
+
+
+def zstd_decode(payload):
+ if not zstd:
+ raise NotImplementedError("Zstd codec is not available")
+ try:
+ return zstd.ZstdDecompressor().decompress(payload)
+ except zstd.ZstdError:
+ return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 9509ab9..dba1801 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -12,7 +12,7 @@ from kafka.vendor import six
import kafka.errors as Errors
from kafka.client_async import KafkaClient, selectors
-from kafka.codec import has_gzip, has_snappy, has_lz4
+from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
@@ -119,7 +119,7 @@ class KafkaProducer(object):
available guarantee.
If unset, defaults to acks=1.
compression_type (str): The compression type for all data generated by
- the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
+ the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
@@ -339,6 +339,7 @@ class KafkaProducer(object):
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
+ 'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD),
None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
}
@@ -388,6 +389,9 @@ class KafkaProducer(object):
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
+ if self.config['compression_type'] == 'zstd':
+ assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
+
# Check compression_type for library support
ct = self.config['compression_type']
if ct not in self._COMPRESSORS:
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 31527bf..4c5c031 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -3,8 +3,8 @@ from __future__ import absolute_import
import io
import time
-from kafka.codec import (has_gzip, has_snappy, has_lz4,
- gzip_decode, snappy_decode,
+from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd,
+ gzip_decode, snappy_decode, zstd_decode,
lz4_decode, lz4_decode_old_kafka)
from kafka.protocol.frame import KafkaBytes
from kafka.protocol.struct import Struct
@@ -35,6 +35,7 @@ class Message(Struct):
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
+ CODEC_ZSTD = 0x04
TIMESTAMP_TYPE_MASK = 0x08
HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
@@ -119,7 +120,7 @@ class Message(Struct):
def decompress(self):
codec = self.attributes & self.CODEC_MASK
- assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD)
if codec == self.CODEC_GZIP:
assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
@@ -132,6 +133,9 @@ class Message(Struct):
raw_bytes = lz4_decode_old_kafka(self.value)
else:
raw_bytes = lz4_decode(self.value)
+ elif codec == self.CODEC_ZSTD:
+ assert has_zstd(), "ZSTD decompression unsupported"
+ raw_bytes = zstd_decode(self.value)
else:
raise Exception('This should be impossible')
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
index 07368bb..a098c42 100644
--- a/kafka/record/default_records.py
+++ b/kafka/record/default_records.py
@@ -62,8 +62,8 @@ from kafka.record.util import (
)
from kafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
- gzip_encode, snappy_encode, lz4_encode,
- gzip_decode, snappy_decode, lz4_decode
+ gzip_encode, snappy_encode, lz4_encode, zstd_encode,
+ gzip_decode, snappy_decode, lz4_decode, zstd_decode
)
import kafka.codec as codecs
@@ -97,6 +97,7 @@ class DefaultRecordBase(object):
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
+ CODEC_ZSTD = 0x04
TIMESTAMP_TYPE_MASK = 0x08
TRANSACTIONAL_MASK = 0x10
CONTROL_MASK = 0x20
@@ -111,6 +112,8 @@ class DefaultRecordBase(object):
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
+ elif compression_type == self.CODEC_ZSTD:
+ checker, name = codecs.has_zstd, "zstd"
if not checker():
raise UnsupportedCodecError(
"Libraries for {} compression codec not found".format(name))
@@ -185,6 +188,8 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
uncompressed = snappy_decode(data.tobytes())
if compression_type == self.CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())
+ if compression_type == self.CODEC_ZSTD:
+ uncompressed = zstd_decode(data.tobytes())
self._buffer = bytearray(uncompressed)
self._pos = 0
self._decompressed = True
@@ -517,6 +522,8 @@ class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
compressed = snappy_encode(data)
elif self._compression_type == self.CODEC_LZ4:
compressed = lz4_encode(data)
+ elif self._compression_type == self.CODEC_ZSTD:
+ compressed = zstd_encode(data)
compressed_size = len(compressed)
if len(data) <= compressed_size:
# We did not get any benefit from compression, lets send
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index a6c4b51..fc2ef2d 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object):
def __init__(self, magic, compression_type, batch_size):
assert magic in [0, 1, 2], "Not supported magic"
- assert compression_type in [0, 1, 2, 3], "Not valid compression type"
+ assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
if magic >= 2:
self._builder = DefaultRecordBatchBuilder(
magic=magic, compression_type=compression_type,
diff --git a/test/test_codec.py b/test/test_codec.py
index 9eff888..e057074 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -7,11 +7,12 @@ import pytest
from kafka.vendor.six.moves import range
from kafka.codec import (
- has_snappy, has_lz4,
+ has_snappy, has_lz4, has_zstd,
gzip_encode, gzip_decode,
snappy_encode, snappy_decode,
lz4_encode, lz4_decode,
lz4_encode_old_kafka, lz4_decode_old_kafka,
+ zstd_encode, zstd_decode,
)
from test.testutil import random_string
@@ -113,3 +114,11 @@ def test_lz4_incremental():
b2 = lz4_decode(lz4_encode(b1))
assert len(b1) == len(b2)
assert b1 == b2
+
+
+@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
+def test_zstd():
+ for _ in range(1000):
+ b1 = random_string(100).encode('utf-8')
+ b2 = zstd_decode(zstd_encode(b1))
+ assert b1 == b2
diff --git a/test/test_producer.py b/test/test_producer.py
index 9605adf..7263130 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -23,16 +23,16 @@ def test_buffer_pool():
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_end_to_end(kafka_broker, compression):
-
if compression == 'lz4':
- # LZ4 requires 0.8.2
if env_kafka_version() < (0, 8, 2):
- return
- # python-lz4 crashes on older versions of pypy
+ pytest.skip('LZ4 requires 0.8.2')
elif platform.python_implementation() == 'PyPy':
- return
+ pytest.skip('python-lz4 crashes on older versions of pypy')
+
+ if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
+ pytest.skip('zstd requires kafka 2.1.0 or newer')
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
@@ -81,8 +81,10 @@ def test_kafka_producer_gc_cleanup():
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
+@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
+ if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
+ pytest.skip('zstd requires 2.1.0 or more')
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
@@ -124,10 +126,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
if headers:
assert record.serialized_header_size == 22
- # generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
- return
-
+ pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
send_time = time.time() * 1000
future = producer.send(
topic,
diff --git a/tox.ini b/tox.ini
index 8dfe2c5..10e9911 100644
--- a/tox.ini
+++ b/tox.ini
@@ -15,6 +15,7 @@ deps =
pytest-mock
mock
python-snappy
+ zstandard
lz4
xxhash
crc32c