summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-23 15:25:05 -0800
committerDana Powers <dana.powers@rd.io>2016-01-24 13:26:42 -0800
commitda787214e1d196992aecf269c9d0105e4c934a4d (patch)
tree8d4db039b00b69b9f17e91870586c3670ca90506
parentee19cbfa4ae92e5fbe41d7ac6e9c199b49c39a88 (diff)
downloadkafka-python-da787214e1d196992aecf269c9d0105e4c934a4d.tar.gz
Support encode and repr on raw BytesIO MessageSets (used in new producer)
-rw-r--r--kafka/protocol/message.py12
1 files changed, 12 insertions, 0 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index dffb1bb..fb54049 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -88,6 +88,13 @@ class MessageSet(AbstractType):
@classmethod
def encode(cls, items, size=True, recalc_message_size=True):
+ # RecordAccumulator encodes messagesets internally
+ if isinstance(items, io.BytesIO):
+ size = Int32.decode(items)
+ # rewind and return all the bytes
+ items.seek(-4, 1)
+ return items.read(size + 4)
+
encoded_values = []
for (offset, message_size, message) in items:
if isinstance(message, Message):
@@ -143,4 +150,9 @@ class MessageSet(AbstractType):
@classmethod
def repr(cls, messages):
+ if isinstance(messages, io.BytesIO):
+ offset = messages.tell()
+ decoded = cls.decode(messages)
+ messages.seek(offset)
+ messages = decoded
return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']'