From eea162eb0366ec15782568ae29e482814b06cc0e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 9 Dec 2015 16:03:33 -0800 Subject: Update kafka.common imports to Payloads namedtuples in test_protocol --- test/test_protocol.py | 46 ++++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-) (limited to 'test/test_protocol.py') diff --git a/test/test_protocol.py b/test/test_protocol.py index 9653ee3..c5086b1 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -7,10 +7,10 @@ from . import unittest from kafka.codec import has_snappy, gzip_decode, snappy_decode from kafka.common import ( - OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, - OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, - ProduceRequest, FetchRequest, Message, ChecksumError, - ProduceResponse, FetchResponse, OffsetAndMessage, + OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest, + OffsetResponsePayload, OffsetCommitResponse, OffsetFetchResponse, + ProduceRequestPayload, FetchRequestPayload, Message, ChecksumError, + ProduceResponsePayload, FetchResponsePayload, OffsetAndMessage, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, ProtocolError, ConsumerMetadataResponse @@ -335,28 +335,30 @@ class TestProtocol(unittest.TestCase): b"@1$%(Y!", # Random padding ]) - msgs = list(KafkaProtocol._decode_message_set_iter(encoded)) + msgs = MessageSet.decode(io.BytesIO(encoded)) self.assertEqual(len(msgs), 2) msg1, msg2 = msgs - returned_offset1, decoded_message1 = msg1 - returned_offset2, decoded_message2 = msg2 + returned_offset1, msg_size1, decoded_message1 = msg1 + returned_offset2, msg_size2, decoded_message2 = msg2 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1", b"k1")) + self.assertEqual(decoded_message1.value, b"v1") + self.assertEqual(decoded_message1.key, b"k1") self.assertEqual(returned_offset2, 1) - self.assertEqual(decoded_message2, create_message(b"v2", b"k2")) + self.assertEqual(decoded_message2.value, b"v2") + self.assertEqual(decoded_message2.key, b"k2") @unittest.skip('needs updating for new protocol classes') def test_encode_produce_request(self): requests = [ - ProduceRequest(b"topic1", 0, [ - create_message(b"a"), - create_message(b"b") + ProduceRequestPayload("topic1", 0, [ + kafka.protocol.message.Message(b"a"), + kafka.protocol.message.Message(b"b") ]), - ProduceRequest(b"topic2", 1, [ - create_message(b"c") + ProduceRequestPayload("topic2", 1, [ + kafka.protocol.message.Message(b"c") ]) ] @@ -480,16 +482,16 @@ class TestProtocol(unittest.TestCase): responses = list(KafkaProtocol.decode_fetch_response(encoded)) def expand_messages(response): - return FetchResponse(response.topic, response.partition, - response.error, response.highwaterMark, - list(response.messages)) + return FetchResponsePayload(response.topic, response.partition, + response.error, response.highwaterMark, + list(response.messages)) expanded_responses = list(map(expand_messages, responses)) - expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]), - OffsetAndMessage(0, msgs[1])]), - FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]), - FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]), - OffsetAndMessage(0, msgs[4])])] + expect = [FetchResponsePayload(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]), + OffsetAndMessage(0, msgs[1])]), + FetchResponsePayload(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]), + FetchResponsePayload(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]), + OffsetAndMessage(0, msgs[4])])] self.assertEqual(expanded_responses, expect) @unittest.skip('needs updating for new protocol classes') -- cgit v1.2.1