diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-21 21:43:44 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 00:17:51 -0700 |
commit | 7f4a9361ea168a0e1073801d0b86868de47d1de2 (patch) | |
tree | dba669f957850d3741b30f2273740a81ace595b0 | |
parent | f2991be6143c6a1a79815ea20fe95bae8f5376ac (diff) | |
download | kafka-python-7f4a9361ea168a0e1073801d0b86868de47d1de2.tar.gz |
Always pass encoded message bytes to MessageSet.encode()
-rw-r--r-- | kafka/protocol/legacy.py | 8 | ||||
-rw-r--r-- | kafka/protocol/message.py | 54 |
2 files changed, 22 insertions, 40 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index cd100d6..6ab2511 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -143,9 +143,11 @@ class KafkaProtocol(object): topic, [( partition, - [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key, - magic=msg.magic, - attributes=msg.attributes)) + [(0, + kafka.protocol.message.Message( + msg.value, key=msg.key, + magic=msg.magic, attributes=msg.attributes + ).encode()) for msg in payload.messages]) for partition, payload in topic_payloads.items()]) for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index ae261bf..8458ac5 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -90,8 +90,7 @@ class PartialMessage(bytes): class MessageSet(AbstractType): ITEM = Schema( ('offset', Int64), - ('message_size', Int32), - ('message', Message.SCHEMA) + ('message', Bytes) ) HEADER_SIZE = 12 # offset + message_size @@ -105,20 +104,13 @@ class MessageSet(AbstractType): return items.read(size + 4) encoded_values = [] - for (offset, message_size, message) in items: - if isinstance(message, Message): - encoded_message = message.encode() - else: - encoded_message = cls.ITEM.fields[2].encode(message) - if recalc_message_size: - message_size = len(encoded_message) - encoded_values.append(cls.ITEM.fields[0].encode(offset)) - encoded_values.append(cls.ITEM.fields[1].encode(message_size)) - encoded_values.append(encoded_message) + for (offset, message) in items: + encoded_values.append(Int64.encode(offset)) + encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) if not size: return encoded - return Int32.encode(len(encoded)) + encoded + return Bytes.encode(encoded) @classmethod def decode(cls, data, bytes_to_read=None): @@ -131,30 +123,18 @@ class MessageSet(AbstractType): bytes_to_read = Int32.decode(data) items = [] - # We need at least 8 + 4 + 14 bytes to read offset + message size + message - # (14 bytes is a message w/ null key and null value) - while bytes_to_read >= 26: - offset = Int64.decode(data) - bytes_to_read -= 8 - - message_size = Int32.decode(data) - bytes_to_read -= 4 - - # if FetchRequest max_bytes is smaller than the available message set - # the server returns partial data for the final message - if message_size > bytes_to_read: + # if FetchRequest max_bytes is smaller than the available message set + # the server returns partial data for the final message + while bytes_to_read: + try: + offset = Int64.decode(data) + msg_bytes = Bytes.decode(data) + bytes_to_read -= 8 + 4 + len(msg_bytes) + items.append((offset, len(msg_bytes), Message.decode(msg_bytes))) + except ValueError: + # PartialMessage to signal that max_bytes may be too small + items.append((None, None, PartialMessage())) break - - message = Message.decode(data) - bytes_to_read -= message_size - - items.append((offset, message_size, message)) - - # If any bytes are left over, clear them from the buffer - # and append a PartialMessage to signal that max_bytes may be too small - if bytes_to_read: - items.append((None, None, PartialMessage(data.read(bytes_to_read)))) - return items @classmethod @@ -164,4 +144,4 @@ class MessageSet(AbstractType): decoded = cls.decode(messages) messages.seek(offset) messages = decoded - return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']' + return str([cls.ITEM.repr(m) for m in messages]) |