summaryrefslogtreecommitdiff
path: root/kafka/protocol/types.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/types.py')
-rw-r--r--kafka/protocol/types.py153
1 files changed, 153 insertions, 0 deletions
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
index 2fde24f..0e3685d 100644
--- a/kafka/protocol/types.py
+++ b/kafka/protocol/types.py
@@ -210,3 +210,156 @@ class Array(AbstractType):
if list_of_items is None:
return 'NULL'
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'
+
+
+class UnsignedVarInt32(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ value, i = 0, 0
+ while True:
+ b, = struct.unpack('B', data.read(1))
+ if not (b & 0x80):
+ break
+ value |= (b & 0x7f) << i
+ i += 7
+ if i > 28:
+ raise ValueError('Invalid value {}'.format(value))
+ value |= b << i
+ return value
+
+ @classmethod
+ def encode(cls, value):
+ value &= 0xffffffff
+ ret = b''
+ while (value & 0xffffff80) != 0:
+ b = (value & 0x7f) | 0x80
+ ret += struct.pack('B', b)
+ value >>= 7
+ ret += struct.pack('B', value)
+ return ret
+
+
+class VarInt32(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ value = UnsignedVarInt32.decode(data)
+ return (value >> 1) ^ -(value & 1)
+
+ @classmethod
+ def encode(cls, value):
+ # bring it in line with the java binary repr
+ value &= 0xffffffff
+ return UnsignedVarInt32.encode((value << 1) ^ (value >> 31))
+
+
+class VarInt64(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ value, i = 0, 0
+ while True:
+ b = data.read(1)
+ if not (b & 0x80):
+ break
+ value |= (b & 0x7f) << i
+ i += 7
+ if i > 63:
+ raise ValueError('Invalid value {}'.format(value))
+ value |= b << i
+ return (value >> 1) ^ -(value & 1)
+
+ @classmethod
+ def encode(cls, value):
+ # bring it in line with the java binary repr
+ value &= 0xffffffffffffffff
+ v = (value << 1) ^ (value >> 63)
+ ret = b''
+ while (v & 0xffffffffffffff80) != 0:
+ b = (value & 0x7f) | 0x80
+ ret += struct.pack('B', b)
+ v >>= 7
+ ret += struct.pack('B', v)
+ return ret
+
+
+class CompactString(String):
+ def decode(self, data):
+ length = UnsignedVarInt32.decode(data) - 1
+ if length < 0:
+ return None
+ value = data.read(length)
+ if len(value) != length:
+ raise ValueError('Buffer underrun decoding string')
+ return value.decode(self.encoding)
+
+ def encode(self, value):
+ if value is None:
+ return UnsignedVarInt32.encode(0)
+ value = str(value).encode(self.encoding)
+ return UnsignedVarInt32.encode(len(value) + 1) + value
+
+
+class TaggedFields(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ num_fields = UnsignedVarInt32.decode(data)
+ ret = {}
+ if not num_fields:
+ return ret
+ prev_tag = -1
+ for i in range(num_fields):
+ tag = UnsignedVarInt32.decode(data)
+ if tag <= prev_tag:
+ raise ValueError('Invalid or out-of-order tag {}'.format(tag))
+ prev_tag = tag
+ size = UnsignedVarInt32.decode(data)
+ val = data.read(size)
+ ret[tag] = val
+ return ret
+
+ @classmethod
+ def encode(cls, value):
+ ret = UnsignedVarInt32.encode(len(value))
+ for k, v in value.items():
+ # do we allow for other data types ?? It could get complicated really fast
+ assert isinstance(v, bytes), 'Value {} is not a byte array'.format(v)
+ assert isinstance(k, int) and k > 0, 'Key {} is not a positive integer'.format(k)
+ ret += UnsignedVarInt32.encode(k)
+ ret += v
+ return ret
+
+
+class CompactBytes(AbstractType):
+ @classmethod
+ def decode(cls, data):
+ length = UnsignedVarInt32.decode(data) - 1
+ if length < 0:
+ return None
+ value = data.read(length)
+ if len(value) != length:
+ raise ValueError('Buffer underrun decoding Bytes')
+ return value
+
+ @classmethod
+ def encode(cls, value):
+ if value is None:
+ return UnsignedVarInt32.encode(0)
+ else:
+ return UnsignedVarInt32.encode(len(value) + 1) + value
+
+
+class CompactArray(Array):
+
+ def encode(self, items):
+ if items is None:
+ return UnsignedVarInt32.encode(0)
+ return b''.join(
+ [UnsignedVarInt32.encode(len(items) + 1)] +
+ [self.array_of.encode(item) for item in items]
+ )
+
+ def decode(self, data):
+ length = UnsignedVarInt32.decode(data) - 1
+ if length == -1:
+ return None
+ return [self.array_of.decode(data) for _ in range(length)]
+