summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 16:03:33 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 16:03:33 -0800
commiteea162eb0366ec15782568ae29e482814b06cc0e (patch)
treee0c6c0dbac8613087e32ef3c08f2adc15cb6dc2d /test
parente37049fb691cdab1d18becf044aaeaf58d46b5d2 (diff)
downloadkafka-python-eea162eb0366ec15782568ae29e482814b06cc0e.tar.gz
Update kafka.common imports to Payloads namedtuples in test_protocol
Diffstat (limited to 'test')
-rw-r--r--test/test_protocol.py46
1 files changed, 24 insertions, 22 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 9653ee3..c5086b1 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -7,10 +7,10 @@ from . import unittest
from kafka.codec import has_snappy, gzip_decode, snappy_decode
from kafka.common import (
- OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
- OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
- ProduceRequest, FetchRequest, Message, ChecksumError,
- ProduceResponse, FetchResponse, OffsetAndMessage,
+ OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest,
+ OffsetResponsePayload, OffsetCommitResponse, OffsetFetchResponse,
+ ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError,
+ ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
ProtocolError, ConsumerMetadataResponse
@@ -335,28 +335,30 @@ class TestProtocol(unittest.TestCase):
b"@1$%(Y!", # Random padding
])
- msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
+ msgs = MessageSet.decode(io.BytesIO(encoded))
self.assertEqual(len(msgs), 2)
msg1, msg2 = msgs
- returned_offset1, decoded_message1 = msg1
- returned_offset2, decoded_message2 = msg2
+ returned_offset1, msg_size1, decoded_message1 = msg1
+ returned_offset2, msg_size2, decoded_message2 = msg2
self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
+ self.assertEqual(decoded_message1.value, b"v1")
+ self.assertEqual(decoded_message1.key, b"k1")
self.assertEqual(returned_offset2, 1)
- self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
+ self.assertEqual(decoded_message2.value, b"v2")
+ self.assertEqual(decoded_message2.key, b"k2")
@unittest.skip('needs updating for new protocol classes')
def test_encode_produce_request(self):
requests = [
- ProduceRequest(b"topic1", 0, [
- create_message(b"a"),
- create_message(b"b")
+ ProduceRequestPayload("topic1", 0, [
+ kafka.protocol.message.Message(b"a"),
+ kafka.protocol.message.Message(b"b")
]),
- ProduceRequest(b"topic2", 1, [
- create_message(b"c")
+ ProduceRequestPayload("topic2", 1, [
+ kafka.protocol.message.Message(b"c")
])
]
@@ -480,16 +482,16 @@ class TestProtocol(unittest.TestCase):
responses = list(KafkaProtocol.decode_fetch_response(encoded))
def expand_messages(response):
- return FetchResponse(response.topic, response.partition,
- response.error, response.highwaterMark,
- list(response.messages))
+ return FetchResponsePayload(response.topic, response.partition,
+ response.error, response.highwaterMark,
+ list(response.messages))
expanded_responses = list(map(expand_messages, responses))
- expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
- OffsetAndMessage(0, msgs[1])]),
- FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
- FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
- OffsetAndMessage(0, msgs[4])])]
+ expect = [FetchResponsePayload(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
+ OffsetAndMessage(0, msgs[1])]),
+ FetchResponsePayload(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
+ FetchResponsePayload(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
+ OffsetAndMessage(0, msgs[4])])]
self.assertEqual(expanded_responses, expect)
@unittest.skip('needs updating for new protocol classes')