diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-21 21:25:55 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 00:17:51 -0700 |
commit | f2991be6143c6a1a79815ea20fe95bae8f5376ac (patch) | |
tree | 5fbb2f69358b5347ba70efdd66ee7bee586488fa | |
parent | 92f859d8da5c3f35ab3738ef2725fff05b6cf57f (diff) | |
download | kafka-python-f2991be6143c6a1a79815ea20fe95bae8f5376ac.tar.gz |
raise ValueError on protocol encode/decode errors
-rw-r--r-- | kafka/protocol/types.py | 47 |
1 files changed, 32 insertions, 15 deletions
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py index 01799bb..18aaca1 100644 --- a/kafka/protocol/types.py +++ b/kafka/protocol/types.py @@ -1,52 +1,63 @@ from __future__ import absolute_import -from struct import pack, unpack +from struct import pack, unpack, error from .abstract import AbstractType +def _pack(f, value): + try: + return pack(f, value) + except error: + raise ValueError(error) + + +def _unpack(f, data): + try: + (value,) = unpack(f, data) + return value + except error: + raise ValueError(error) + + class Int8(AbstractType): @classmethod def encode(cls, value): - return pack('>b', value) + return _pack('>b', value) @classmethod def decode(cls, data): - (value,) = unpack('>b', data.read(1)) - return value + return _unpack('>b', data.read(1)) class Int16(AbstractType): @classmethod def encode(cls, value): - return pack('>h', value) + return _pack('>h', value) @classmethod def decode(cls, data): - (value,) = unpack('>h', data.read(2)) - return value + return _unpack('>h', data.read(2)) class Int32(AbstractType): @classmethod def encode(cls, value): - return pack('>i', value) + return _pack('>i', value) @classmethod def decode(cls, data): - (value,) = unpack('>i', data.read(4)) - return value + return _unpack('>i', data.read(4)) class Int64(AbstractType): @classmethod def encode(cls, value): - return pack('>q', value) + return _pack('>q', value) @classmethod def decode(cls, data): - (value,) = unpack('>q', data.read(8)) - return value + return _unpack('>q', data.read(8)) class String(AbstractType): @@ -63,7 +74,10 @@ class String(AbstractType): length = Int16.decode(data) if length < 0: return None - return data.read(length).decode(self.encoding) + value = data.read(length) + if len(value) != length: + raise ValueError('Buffer underrun decoding string') + return value.decode(self.encoding) class Bytes(AbstractType): @@ -79,7 +93,10 @@ class Bytes(AbstractType): length = Int32.decode(data) if length < 0: return None - return data.read(length) + value = data.read(length) + if len(value) != length: + raise ValueError('Buffer underrun decoding Bytes') + return value class Schema(AbstractType): |