summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-03-14 11:01:58 -0700
committerDana Powers <dana.powers@gmail.com>2017-03-14 13:30:35 -0700
commitfea10d9c169214af82303744069bdd6c66c4a2ef (patch)
treeb80f378d6b13e71c0e0e4d7a0dba91c5ee86cb23
parentfb023fe85d0bac4e088346765311794a574d13bf (diff)
downloadkafka-python-fea10d9c169214af82303744069bdd6c66c4a2ef.tar.gz
LZ4 support in kafka 0.8/0.9 does not accept a ContentSize header
-rw-r--r--kafka/codec.py20
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 29db48e..a527b42 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -229,13 +229,21 @@ def lz4_encode_old_kafka(payload):
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
- if isinstance(data[4], int):
- flg = data[4]
- else:
- flg = ord(data[4])
+ flg = data[4]
+ if not isinstance(flg, int):
+ flg = ord(flg)
+
content_size_bit = ((flg >> 3) & 1)
if content_size_bit:
- header_size += 8
+ # Old kafka does not accept the content-size field
+ # so we need to discard it and reset the header flag
+ flg -= 8
+ data = bytearray(data)
+ data[4] = flg
+ data = bytes(data)
+ payload = data[header_size+8:]
+ else:
+ payload = data[header_size:]
# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
@@ -243,7 +251,7 @@ def lz4_encode_old_kafka(payload):
return b''.join([
data[0:header_size-1],
hc,
- data[header_size:]
+ payload
])