summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-21 21:25:55 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 00:17:51 -0700
commitf2991be6143c6a1a79815ea20fe95bae8f5376ac (patch)
tree5fbb2f69358b5347ba70efdd66ee7bee586488fa
parent92f859d8da5c3f35ab3738ef2725fff05b6cf57f (diff)
downloadkafka-python-f2991be6143c6a1a79815ea20fe95bae8f5376ac.tar.gz
raise ValueError on protocol encode/decode errors
-rw-r--r--kafka/protocol/types.py47
1 files changed, 32 insertions, 15 deletions
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
index 01799bb..18aaca1 100644
--- a/kafka/protocol/types.py
+++ b/kafka/protocol/types.py
@@ -1,52 +1,63 @@
from __future__ import absolute_import
-from struct import pack, unpack
+from struct import pack, unpack, error
from .abstract import AbstractType
+def _pack(f, value):
+ try:
+ return pack(f, value)
+ except error:
+ raise ValueError(error)
+
+
+def _unpack(f, data):
+ try:
+ (value,) = unpack(f, data)
+ return value
+ except error:
+ raise ValueError(error)
+
+
class Int8(AbstractType):
@classmethod
def encode(cls, value):
- return pack('>b', value)
+ return _pack('>b', value)
@classmethod
def decode(cls, data):
- (value,) = unpack('>b', data.read(1))
- return value
+ return _unpack('>b', data.read(1))
class Int16(AbstractType):
@classmethod
def encode(cls, value):
- return pack('>h', value)
+ return _pack('>h', value)
@classmethod
def decode(cls, data):
- (value,) = unpack('>h', data.read(2))
- return value
+ return _unpack('>h', data.read(2))
class Int32(AbstractType):
@classmethod
def encode(cls, value):
- return pack('>i', value)
+ return _pack('>i', value)
@classmethod
def decode(cls, data):
- (value,) = unpack('>i', data.read(4))
- return value
+ return _unpack('>i', data.read(4))
class Int64(AbstractType):
@classmethod
def encode(cls, value):
- return pack('>q', value)
+ return _pack('>q', value)
@classmethod
def decode(cls, data):
- (value,) = unpack('>q', data.read(8))
- return value
+ return _unpack('>q', data.read(8))
class String(AbstractType):
@@ -63,7 +74,10 @@ class String(AbstractType):
length = Int16.decode(data)
if length < 0:
return None
- return data.read(length).decode(self.encoding)
+ value = data.read(length)
+ if len(value) != length:
+ raise ValueError('Buffer underrun decoding string')
+ return value.decode(self.encoding)
class Bytes(AbstractType):
@@ -79,7 +93,10 @@ class Bytes(AbstractType):
length = Int32.decode(data)
if length < 0:
return None
- return data.read(length)
+ value = data.read(length)
+ if len(value) != length:
+ raise ValueError('Buffer underrun decoding Bytes')
+ return value
class Schema(AbstractType):