summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-21 21:43:44 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 00:17:51 -0700
commit7f4a9361ea168a0e1073801d0b86868de47d1de2 (patch)
treedba669f957850d3741b30f2273740a81ace595b0
parentf2991be6143c6a1a79815ea20fe95bae8f5376ac (diff)
downloadkafka-python-7f4a9361ea168a0e1073801d0b86868de47d1de2.tar.gz
Always pass encoded message bytes to MessageSet.encode()
-rw-r--r--kafka/protocol/legacy.py8
-rw-r--r--kafka/protocol/message.py54
2 files changed, 22 insertions, 40 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index cd100d6..6ab2511 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -143,9 +143,11 @@ class KafkaProtocol(object):
topic,
[(
partition,
- [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key,
- magic=msg.magic,
- attributes=msg.attributes))
+ [(0,
+ kafka.protocol.message.Message(
+ msg.value, key=msg.key,
+ magic=msg.magic, attributes=msg.attributes
+ ).encode())
for msg in payload.messages])
for partition, payload in topic_payloads.items()])
for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index ae261bf..8458ac5 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -90,8 +90,7 @@ class PartialMessage(bytes):
class MessageSet(AbstractType):
ITEM = Schema(
('offset', Int64),
- ('message_size', Int32),
- ('message', Message.SCHEMA)
+ ('message', Bytes)
)
HEADER_SIZE = 12 # offset + message_size
@@ -105,20 +104,13 @@ class MessageSet(AbstractType):
return items.read(size + 4)
encoded_values = []
- for (offset, message_size, message) in items:
- if isinstance(message, Message):
- encoded_message = message.encode()
- else:
- encoded_message = cls.ITEM.fields[2].encode(message)
- if recalc_message_size:
- message_size = len(encoded_message)
- encoded_values.append(cls.ITEM.fields[0].encode(offset))
- encoded_values.append(cls.ITEM.fields[1].encode(message_size))
- encoded_values.append(encoded_message)
+ for (offset, message) in items:
+ encoded_values.append(Int64.encode(offset))
+ encoded_values.append(Bytes.encode(message))
encoded = b''.join(encoded_values)
if not size:
return encoded
- return Int32.encode(len(encoded)) + encoded
+ return Bytes.encode(encoded)
@classmethod
def decode(cls, data, bytes_to_read=None):
@@ -131,30 +123,18 @@ class MessageSet(AbstractType):
bytes_to_read = Int32.decode(data)
items = []
- # We need at least 8 + 4 + 14 bytes to read offset + message size + message
- # (14 bytes is a message w/ null key and null value)
- while bytes_to_read >= 26:
- offset = Int64.decode(data)
- bytes_to_read -= 8
-
- message_size = Int32.decode(data)
- bytes_to_read -= 4
-
- # if FetchRequest max_bytes is smaller than the available message set
- # the server returns partial data for the final message
- if message_size > bytes_to_read:
+ # if FetchRequest max_bytes is smaller than the available message set
+ # the server returns partial data for the final message
+ while bytes_to_read:
+ try:
+ offset = Int64.decode(data)
+ msg_bytes = Bytes.decode(data)
+ bytes_to_read -= 8 + 4 + len(msg_bytes)
+ items.append((offset, len(msg_bytes), Message.decode(msg_bytes)))
+ except ValueError:
+ # PartialMessage to signal that max_bytes may be too small
+ items.append((None, None, PartialMessage()))
break
-
- message = Message.decode(data)
- bytes_to_read -= message_size
-
- items.append((offset, message_size, message))
-
- # If any bytes are left over, clear them from the buffer
- # and append a PartialMessage to signal that max_bytes may be too small
- if bytes_to_read:
- items.append((None, None, PartialMessage(data.read(bytes_to_read))))
-
return items
@classmethod
@@ -164,4 +144,4 @@ class MessageSet(AbstractType):
decoded = cls.decode(messages)
messages.seek(offset)
messages = decoded
- return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']'
+ return str([cls.ITEM.repr(m) for m in messages])