From ed85c58726f867d956f705194765b18ed3ac3855 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 1 Jun 2016 15:43:02 -0700 Subject: Fix bug in MessageSet decoding that crashed multi-partition FetchResponse --- kafka/protocol/message.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'kafka/protocol') diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 78840fc..656c131 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -169,14 +169,17 @@ class MessageSet(AbstractType): data = io.BytesIO(data) if bytes_to_read is None: bytes_to_read = Int32.decode(data) - items = [] # if FetchRequest max_bytes is smaller than the available message set # the server returns partial data for the final message + # So create an internal buffer to avoid over-reading + raw = io.BytesIO(data.read(bytes_to_read)) + + items = [] while bytes_to_read: try: - offset = Int64.decode(data) - msg_bytes = Bytes.decode(data) + offset = Int64.decode(raw) + msg_bytes = Bytes.decode(raw) bytes_to_read -= 8 + 4 + len(msg_bytes) items.append((offset, len(msg_bytes), Message.decode(msg_bytes))) except ValueError: -- cgit v1.2.1