summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 00:17:40 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 09:51:37 -0700
commitaa5bde6ac382966395f8f1466c46d55cf28c2cce (patch)
treef3b75dcea569e28f1685500af53bff34514374b9
parent54eb2641ec9aac1249be00dda92c07db39801400 (diff)
downloadkafka-python-message_format_v1.tar.gz
Add some simple message protocol testsmessage_format_v1
-rw-r--r--test/test_protocol.py146
1 files changed, 146 insertions, 0 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
new file mode 100644
index 0000000..247fcc3
--- /dev/null
+++ b/test/test_protocol.py
@@ -0,0 +1,146 @@
+#pylint: skip-file
+import struct
+
+import pytest
+import six
+
+from kafka.protocol.api import RequestHeader
+from kafka.protocol.commit import GroupCoordinatorRequest
+from kafka.protocol.message import Message, MessageSet
+
+
+def test_create_message():
+ payload = b'test'
+ key = b'key'
+ msg = Message(payload, key=key)
+ assert msg.magic == 0
+ assert msg.attributes == 0
+ assert msg.key == key
+ assert msg.value == payload
+
+
+def test_encode_message_v0():
+ message = Message(b'test', key=b'key')
+ encoded = message.encode()
+ expect = b''.join([
+ struct.pack('>i', -1427009701), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 3), # Length of key
+ b'key', # key
+ struct.pack('>i', 4), # Length of value
+ b'test', # value
+ ])
+ assert encoded == expect
+
+
+def test_encode_message_v1():
+ message = Message(b'test', key=b'key', magic=1, timestamp=1234)
+ encoded = message.encode()
+ expect = b''.join([
+ struct.pack('>i', 1331087195), # CRC
+ struct.pack('>bb', 1, 0), # Magic, flags
+ struct.pack('>q', 1234), # Timestamp
+ struct.pack('>i', 3), # Length of key
+ b'key', # key
+ struct.pack('>i', 4), # Length of value
+ b'test', # value
+ ])
+ assert encoded == expect
+
+
+def test_decode_message():
+ encoded = b''.join([
+ struct.pack('>i', -1427009701), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 3), # Length of key
+ b'key', # key
+ struct.pack('>i', 4), # Length of value
+ b'test', # value
+ ])
+ decoded_message = Message.decode(encoded)
+ msg = Message(b'test', key=b'key')
+ msg.encode() # crc is recalculated during encoding
+ assert decoded_message == msg
+
+
+def test_encode_message_set():
+ messages = [
+ Message(b'v1', key=b'k1'),
+ Message(b'v2', key=b'k2')
+ ]
+ encoded = MessageSet.encode([(0, msg.encode())
+ for msg in messages],
+ size=False)
+ expect = b''.join([
+ struct.pack('>q', 0), # MsgSet Offset
+ struct.pack('>i', 18), # Msg Size
+ struct.pack('>i', 1474775406), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k1', # Key
+ struct.pack('>i', 2), # Length of value
+ b'v1', # Value
+
+ struct.pack('>q', 0), # MsgSet Offset
+ struct.pack('>i', 18), # Msg Size
+ struct.pack('>i', -16383415), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k2', # Key
+ struct.pack('>i', 2), # Length of value
+ b'v2', # Value
+ ])
+ assert encoded == expect
+
+
+def test_decode_message_set():
+ encoded = b''.join([
+ struct.pack('>q', 0), # MsgSet Offset
+ struct.pack('>i', 18), # Msg Size
+ struct.pack('>i', 1474775406), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k1', # Key
+ struct.pack('>i', 2), # Length of value
+ b'v1', # Value
+
+ struct.pack('>q', 1), # MsgSet Offset
+ struct.pack('>i', 18), # Msg Size
+ struct.pack('>i', -16383415), # CRC
+ struct.pack('>bb', 0, 0), # Magic, flags
+ struct.pack('>i', 2), # Length of key
+ b'k2', # Key
+ struct.pack('>i', 2), # Length of value
+ b'v2', # Value
+ ])
+
+ msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded))
+ assert len(msgs) == 2
+ msg1, msg2 = msgs
+
+ returned_offset1, message1_size, decoded_message1 = msg1
+ returned_offset2, message2_size, decoded_message2 = msg2
+
+ assert returned_offset1 == 0
+ message1 = Message(b'v1', key=b'k1')
+ message1.encode()
+ assert decoded_message1 == message1
+
+ assert returned_offset2 == 1
+ message2 = Message(b'v2', key=b'k2')
+ message2.encode()
+ assert decoded_message2 == message2
+
+
+def test_encode_message_header():
+ expect = b''.join([
+ struct.pack('>h', 10), # API Key
+ struct.pack('>h', 0), # API Version
+ struct.pack('>i', 4), # Correlation Id
+ struct.pack('>h', len('client3')), # Length of clientId
+ b'client3', # ClientId
+ ])
+
+ req = GroupCoordinatorRequest[0]('foo')
+ header = RequestHeader(req, correlation_id=4, client_id='client3')
+ assert header.encode() == expect