diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-06-01 14:34:45 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-06-01 15:27:30 -0700 |
commit | 4ed0453b533b4b108a2a6aedf4b6e0f7bb49611f (patch) | |
tree | 94f9280177cddd233a63924d67b3624c6ba06c48 | |
parent | 3a971ea95e43341d105a5a7def6c0cb383be9e62 (diff) | |
download | kafka-python-4ed0453b533b4b108a2a6aedf4b6e0f7bb49611f.tar.gz |
Test decoding messagesets with partial messages
-rw-r--r-- | test/test_protocol.py | 102 |
1 files changed, 101 insertions, 1 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py index 247fcc3..2b52f48 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -1,4 +1,5 @@ #pylint: skip-file +import io import struct import pytest @@ -6,7 +7,9 @@ import six from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorRequest -from kafka.protocol.message import Message, MessageSet +from kafka.protocol.fetch import FetchResponse +from kafka.protocol.message import Message, MessageSet, PartialMessage +from kafka.protocol.types import Int16, Int32, Int64, String def test_create_message(): @@ -144,3 +147,100 @@ def test_encode_message_header(): req = GroupCoordinatorRequest[0]('foo') header = RequestHeader(req, correlation_id=4, client_id='client3') assert header.encode() == expect + + +def test_decode_message_set_partial(): + encoded = b''.join([ + struct.pack('>q', 0), # Msg Offset + struct.pack('>i', 18), # Msg Size + struct.pack('>i', 1474775406), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k1', # Key + struct.pack('>i', 2), # Length of value + b'v1', # Value + + struct.pack('>q', 1), # Msg Offset + struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size) + struct.pack('>i', -16383415), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k2', # Key + struct.pack('>i', 8), # Length of value + b'ar', # Value (truncated) + ]) + + msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded)) + assert len(msgs) == 2 + msg1, msg2 = msgs + + returned_offset1, message1_size, decoded_message1 = msg1 + returned_offset2, message2_size, decoded_message2 = msg2 + + assert returned_offset1 == 0 + message1 = Message(b'v1', key=b'k1') + message1.encode() + assert decoded_message1 == message1 + + assert returned_offset2 is None + assert message2_size is None + assert decoded_message2 == PartialMessage() + + +def test_decode_fetch_response_partial(): + encoded = b''.join([ + Int32.encode(1), # Num Topics (Array) + String('utf-8').encode('foobar'), + Int32.encode(2), # Num Partitions (Array) + Int32.encode(0), # Partition id + Int16.encode(0), # Error Code + Int64.encode(1234), # Highwater offset + Int32.encode(52), # MessageSet size + Int64.encode(0), # Msg Offset + Int32.encode(18), # Msg Size + struct.pack('>i', 1474775406), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k1', # Key + struct.pack('>i', 2), # Length of value + b'v1', # Value + + Int64.encode(1), # Msg Offset + struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size) + struct.pack('>i', -16383415), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k2', # Key + struct.pack('>i', 8), # Length of value + b'ar', # Value (truncated) + Int32.encode(1), + Int16.encode(0), + Int64.encode(2345), + Int32.encode(52), # MessageSet size + Int64.encode(0), # Msg Offset + Int32.encode(18), # Msg Size + struct.pack('>i', 1474775406), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k1', # Key + struct.pack('>i', 2), # Length of value + b'v1', # Value + + Int64.encode(1), # Msg Offset + struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size) + struct.pack('>i', -16383415), # CRC + struct.pack('>bb', 0, 0), # Magic, flags + struct.pack('>i', 2), # Length of key + b'k2', # Key + struct.pack('>i', 8), # Length of value + b'ar', # Value (truncated) + ]) + + resp = FetchResponse[0].decode(io.BytesIO(encoded)) + assert len(resp.topics) == 1 + topic, partitions = resp.topics[0] + assert topic == 'foobar' + assert len(partitions) == 2 + m1 = partitions[0][3] + assert len(m1) == 2 + assert m1[1] == (None, None, PartialMessage()) |