summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/protocol/message.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 78840fc..656c131 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -169,14 +169,17 @@ class MessageSet(AbstractType):
data = io.BytesIO(data)
if bytes_to_read is None:
bytes_to_read = Int32.decode(data)
- items = []
# if FetchRequest max_bytes is smaller than the available message set
# the server returns partial data for the final message
+ # So create an internal buffer to avoid over-reading
+ raw = io.BytesIO(data.read(bytes_to_read))
+
+ items = []
while bytes_to_read:
try:
- offset = Int64.decode(data)
- msg_bytes = Bytes.decode(data)
+ offset = Int64.decode(raw)
+ msg_bytes = Bytes.decode(raw)
bytes_to_read -= 8 + 4 + len(msg_bytes)
items.append((offset, len(msg_bytes), Message.decode(msg_bytes)))
except ValueError: