diff options
author | Taras <voyn1991@gmail.com> | 2018-03-18 15:56:47 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-04-18 13:51:07 -0700 |
commit | 908ac8f8d253b20d70e36ce4bae1aefb51769221 (patch) | |
tree | 844aca7543b3b57e17b12d5f55f6cffa0e89f73b /kafka | |
parent | d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (diff) | |
download | kafka-python-908ac8f8d253b20d70e36ce4bae1aefb51769221.tar.gz |
Add codec validators to record parser and builder for all formats (#1447)
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/record/default_records.py | 22 | ||||
-rw-r--r-- | kafka/record/legacy_records.py | 18 |
2 files changed, 34 insertions, 6 deletions
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 840868a..955e3ee 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -54,17 +54,18 @@ # * Timestamp Type (3) # * Compression Type (0-2) -import io import struct import time from kafka.record.abc import ABCRecord, ABCRecordBatch, ABCRecordBatchBuilder -from kafka.record.util import decode_varint, encode_varint, calc_crc32c, size_of_varint - -from kafka.errors import CorruptRecordException +from kafka.record.util import ( + decode_varint, encode_varint, calc_crc32c, size_of_varint +) +from kafka.errors import CorruptRecordException, UnsupportedCodecError from kafka.codec import ( gzip_encode, snappy_encode, lz4_encode, gzip_decode, snappy_decode, lz4_decode ) +import kafka.codec as codecs class DefaultRecordBase(object): @@ -101,6 +102,17 @@ class DefaultRecordBase(object): LOG_APPEND_TIME = 1 CREATE_TIME = 0 + def _assert_has_codec(self, compression_type): + if compression_type == self.CODEC_GZIP: + checker, name = codecs.has_gzip, "gzip" + elif compression_type == self.CODEC_SNAPPY: + checker, name = codecs.has_snappy, "snappy" + elif compression_type == self.CODEC_LZ4: + checker, name = codecs.has_lz4, "lz4" + if not checker(): + raise UnsupportedCodecError( + "Libraries for {} compression codec not found".format(name)) + class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): @@ -156,6 +168,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): if not self._decompressed: compression_type = self.compression_type if compression_type != self.CODEC_NONE: + self._assert_has_codec(compression_type) data = memoryview(self._buffer)[self._pos:] if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) @@ -481,6 +494,7 @@ class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder): def _maybe_compress(self): if self._compression_type != self.CODEC_NONE: + self._assert_has_codec(self._compression_type) header_size = self.HEADER_STRUCT.size data = bytes(self._buffer[header_size:]) if self._compression_type == self.CODEC_GZIP: diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 036e6c4..1bdba81 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -49,9 +49,10 @@ from kafka.record.util import calc_crc32 from kafka.codec import ( gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, - gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka + gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka, ) -from kafka.errors import CorruptRecordException +import kafka.codec as codecs +from kafka.errors import CorruptRecordException, UnsupportedCodecError class LegacyRecordBase(object): @@ -112,6 +113,17 @@ class LegacyRecordBase(object): NO_TIMESTAMP = -1 + def _assert_has_codec(self, compression_type): + if compression_type == self.CODEC_GZIP: + checker, name = codecs.has_gzip, "gzip" + elif compression_type == self.CODEC_SNAPPY: + checker, name = codecs.has_snappy, "snappy" + elif compression_type == self.CODEC_LZ4: + checker, name = codecs.has_lz4, "lz4" + if not checker(): + raise UnsupportedCodecError( + "Libraries for {} compression codec not found".format(name)) + class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): @@ -166,6 +178,7 @@ class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): data = self._buffer[pos:pos + value_size] compression_type = self.compression_type + self._assert_has_codec(compression_type) if compression_type == self.CODEC_GZIP: uncompressed = gzip_decode(data) elif compression_type == self.CODEC_SNAPPY: @@ -419,6 +432,7 @@ class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): def _maybe_compress(self): if self._compression_type: + self._assert_has_codec(self._compression_type) data = bytes(self._buffer) if self._compression_type == self.CODEC_GZIP: compressed = gzip_encode(data) |