summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-01 15:43:02 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-01 15:43:02 -0700
commited85c58726f867d956f705194765b18ed3ac3855 (patch)
tree6c49137d367dbaca597385935c59215b7b76c683
parent4ed0453b533b4b108a2a6aedf4b6e0f7bb49611f (diff)
downloadkafka-python-ed85c58726f867d956f705194765b18ed3ac3855.tar.gz
Fix bug in MessageSet decoding that crashed multi-partition FetchResponsepartial_messages
-rw-r--r--kafka/protocol/message.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 78840fc..656c131 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -169,14 +169,17 @@ class MessageSet(AbstractType):
data = io.BytesIO(data)
if bytes_to_read is None:
bytes_to_read = Int32.decode(data)
- items = []
# if FetchRequest max_bytes is smaller than the available message set
# the server returns partial data for the final message
+ # So create an internal buffer to avoid over-reading
+ raw = io.BytesIO(data.read(bytes_to_read))
+
+ items = []
while bytes_to_read:
try:
- offset = Int64.decode(data)
- msg_bytes = Bytes.decode(data)
+ offset = Int64.decode(raw)
+ msg_bytes = Bytes.decode(raw)
bytes_to_read -= 8 + 4 + len(msg_bytes)
items.append((offset, len(msg_bytes), Message.decode(msg_bytes)))
except ValueError: