From 90c729438a2e3f1b194e58231e41bd16bd7b7172 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 Apr 2016 22:54:28 -0700 Subject: Use version-indexed lists for request/response protocol structs --- kafka/protocol/legacy.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'kafka/protocol/legacy.py') diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index e4745f1..2eddf3b 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -136,7 +136,7 @@ class KafkaProtocol(object): if acks not in (1, 0, -1): raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) - return kafka.protocol.produce.ProduceRequest( + return kafka.protocol.produce.ProduceRequest[0]( required_acks=acks, timeout=timeout, topics=[( @@ -180,7 +180,7 @@ class KafkaProtocol(object): Return: FetchRequest """ - return kafka.protocol.fetch.FetchRequest( + return kafka.protocol.fetch.FetchRequest[0]( replica_id=-1, max_wait_time=max_wait_time, min_bytes=min_bytes, @@ -212,7 +212,7 @@ class KafkaProtocol(object): @classmethod def encode_offset_request(cls, payloads=()): - return kafka.protocol.offset.OffsetRequest( + return kafka.protocol.offset.OffsetRequest[0]( replica_id=-1, topics=[( topic, @@ -250,7 +250,7 @@ class KafkaProtocol(object): if payloads is not None: topics = payloads - return kafka.protocol.metadata.MetadataRequest(topics) + return kafka.protocol.metadata.MetadataRequest[0](topics) @classmethod def decode_metadata_response(cls, response): @@ -297,7 +297,7 @@ class KafkaProtocol(object): group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequestPayload """ - return kafka.protocol.commit.OffsetCommitRequest_v0( + return kafka.protocol.commit.OffsetCommitRequest[0]( consumer_group=group, topics=[( topic, @@ -337,11 +337,11 @@ class KafkaProtocol(object): from_kafka: bool, default False, set True for Kafka-committed offsets """ if from_kafka: - request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + version = 1 else: - request_class = kafka.protocol.commit.OffsetFetchRequest_v0 + version = 0 - return request_class( + return kafka.protocol.commit.OffsetFetchRequest[version]( consumer_group=group, topics=[( topic, -- cgit v1.2.1