summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-01 14:34:45 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-01 15:27:30 -0700
commit4ed0453b533b4b108a2a6aedf4b6e0f7bb49611f (patch)
tree94f9280177cddd233a63924d67b3624c6ba06c48
parent3a971ea95e43341d105a5a7def6c0cb383be9e62 (diff)
downloadkafka-python-4ed0453b533b4b108a2a6aedf4b6e0f7bb49611f.tar.gz
Test decoding messagesets with partial messages
-rw-r--r--test/test_protocol.py102
1 files changed, 101 insertions, 1 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 247fcc3..2b52f48 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -1,4 +1,5 @@
#pylint: skip-file
+import io
import struct
import pytest
@@ -6,7 +7,9 @@ import six
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorRequest
-from kafka.protocol.message import Message, MessageSet
+from kafka.protocol.fetch import FetchResponse
+from kafka.protocol.message import Message, MessageSet, PartialMessage
+from kafka.protocol.types import Int16, Int32, Int64, String
def test_create_message():
@@ -144,3 +147,100 @@ def test_encode_message_header():
req = GroupCoordinatorRequest[0]('foo')
header = RequestHeader(req, correlation_id=4, client_id='client3')
assert header.encode() == expect
+
+
+def test_decode_message_set_partial():
+ encoded = b''.join([
+ struct.pack('>q', 0), # Msg Offset
+ struct.pack('>i', 18), # Msg Size
+ struct.pack('>i', 1474775406), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k1', # Key
+ struct.pack('>i', 2), # Length of value
+ b'v1', # Value
+
+ struct.pack('>q', 1), # Msg Offset
+ struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size)
+ struct.pack('>i', -16383415), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k2', # Key
+ struct.pack('>i', 8), # Length of value
+ b'ar', # Value (truncated)
+ ])
+
+ msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded))
+ assert len(msgs) == 2
+ msg1, msg2 = msgs
+
+ returned_offset1, message1_size, decoded_message1 = msg1
+ returned_offset2, message2_size, decoded_message2 = msg2
+
+ assert returned_offset1 == 0
+ message1 = Message(b'v1', key=b'k1')
+ message1.encode()
+ assert decoded_message1 == message1
+
+ assert returned_offset2 is None
+ assert message2_size is None
+ assert decoded_message2 == PartialMessage()
+
+
+def test_decode_fetch_response_partial():
+ encoded = b''.join([
+ Int32.encode(1), # Num Topics (Array)
+ String('utf-8').encode('foobar'),
+ Int32.encode(2), # Num Partitions (Array)
+ Int32.encode(0), # Partition id
+ Int16.encode(0), # Error Code
+ Int64.encode(1234), # Highwater offset
+ Int32.encode(52), # MessageSet size
+ Int64.encode(0), # Msg Offset
+ Int32.encode(18), # Msg Size
+ struct.pack('>i', 1474775406), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k1', # Key
+ struct.pack('>i', 2), # Length of value
+ b'v1', # Value
+
+ Int64.encode(1), # Msg Offset
+ struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size)
+ struct.pack('>i', -16383415), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k2', # Key
+ struct.pack('>i', 8), # Length of value
+ b'ar', # Value (truncated)
+ Int32.encode(1),
+ Int16.encode(0),
+ Int64.encode(2345),
+ Int32.encode(52), # MessageSet size
+ Int64.encode(0), # Msg Offset
+ Int32.encode(18), # Msg Size
+ struct.pack('>i', 1474775406), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k1', # Key
+ struct.pack('>i', 2), # Length of value
+ b'v1', # Value
+
+ Int64.encode(1), # Msg Offset
+ struct.pack('>i', 24), # Msg Size (larger than remaining MsgSet size)
+ struct.pack('>i', -16383415), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k2', # Key
+ struct.pack('>i', 8), # Length of value
+ b'ar', # Value (truncated)
+ ])
+
+ resp = FetchResponse[0].decode(io.BytesIO(encoded))
+ assert len(resp.topics) == 1
+ topic, partitions = resp.topics[0]
+ assert topic == 'foobar'
+ assert len(partitions) == 2
+ m1 = partitions[0][3]
+ assert len(m1) == 2
+ assert m1[1] == (None, None, PartialMessage())