summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2018-03-18 15:56:47 +0200
committerDana Powers <dana.powers@gmail.com>2018-04-18 13:51:07 -0700
commit908ac8f8d253b20d70e36ce4bae1aefb51769221 (patch)
tree844aca7543b3b57e17b12d5f55f6cffa0e89f73b /kafka
parentd9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (diff)
downloadkafka-python-908ac8f8d253b20d70e36ce4bae1aefb51769221.tar.gz
Add codec validators to record parser and builder for all formats (#1447)
Diffstat (limited to 'kafka')
-rw-r--r--kafka/record/default_records.py22
-rw-r--r--kafka/record/legacy_records.py18
2 files changed, 34 insertions, 6 deletions
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
index 840868a..955e3ee 100644
--- a/kafka/record/default_records.py
+++ b/kafka/record/default_records.py
@@ -54,17 +54,18 @@
# * Timestamp Type (3)
# * Compression Type (0-2)
-import io
import struct
import time
from kafka.record.abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder
-from kafka.record.util import decode_varint, encode_varint, calc_crc32c, size_of_varint
-
-from kafka.errors import CorruptRecordException
+from kafka.record.util import (
+ decode_varint, encode_varint, calc_crc32c, size_of_varint
+)
+from kafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
)
+import kafka.codec as codecs
class DefaultRecordBase(object):
@@ -101,6 +102,17 @@ class DefaultRecordBase(object):
LOG_APPEND_TIME = 1
CREATE_TIME = 0
+ def _assert_has_codec(self, compression_type):
+ if compression_type == self.CODEC_GZIP:
+ checker, name = codecs.has_gzip, "gzip"
+ elif compression_type == self.CODEC_SNAPPY:
+ checker, name = codecs.has_snappy, "snappy"
+ elif compression_type == self.CODEC_LZ4:
+ checker, name = codecs.has_lz4, "lz4"
+ if not checker():
+ raise UnsupportedCodecError(
+ "Libraries for {} compression codec not found".format(name))
+
class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
@@ -156,6 +168,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
if not self._decompressed:
compression_type = self.compression_type
if compression_type != self.CODEC_NONE:
+ self._assert_has_codec(compression_type)
data = memoryview(self._buffer)[self._pos:]
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
@@ -481,6 +494,7 @@ class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
def _maybe_compress(self):
if self._compression_type != self.CODEC_NONE:
+ self._assert_has_codec(self._compression_type)
header_size = self.HEADER_STRUCT.size
data = bytes(self._buffer[header_size:])
if self._compression_type == self.CODEC_GZIP:
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
index 036e6c4..1bdba81 100644
--- a/kafka/record/legacy_records.py
+++ b/kafka/record/legacy_records.py
@@ -49,9 +49,10 @@ from kafka.record.util import calc_crc32
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
- gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
+ gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka,
)
-from kafka.errors import CorruptRecordException
+import kafka.codec as codecs
+from kafka.errors import CorruptRecordException, UnsupportedCodecError
class LegacyRecordBase(object):
@@ -112,6 +113,17 @@ class LegacyRecordBase(object):
NO_TIMESTAMP = -1
+ def _assert_has_codec(self, compression_type):
+ if compression_type == self.CODEC_GZIP:
+ checker, name = codecs.has_gzip, "gzip"
+ elif compression_type == self.CODEC_SNAPPY:
+ checker, name = codecs.has_snappy, "snappy"
+ elif compression_type == self.CODEC_LZ4:
+ checker, name = codecs.has_lz4, "lz4"
+ if not checker():
+ raise UnsupportedCodecError(
+ "Libraries for {} compression codec not found".format(name))
+
class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
@@ -166,6 +178,7 @@ class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
data = self._buffer[pos:pos + value_size]
compression_type = self.compression_type
+ self._assert_has_codec(compression_type)
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
elif compression_type == self.CODEC_SNAPPY:
@@ -419,6 +432,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase):
def _maybe_compress(self):
if self._compression_type:
+ self._assert_has_codec(self._compression_type)
data = bytes(self._buffer)
if self._compression_type == self.CODEC_GZIP:
compressed = gzip_encode(data)