From 90c729438a2e3f1b194e58231e41bd16bd7b7172 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 5 Apr 2016 22:54:28 -0700 Subject: Use version-indexed lists for request/response protocol structs --- kafka/coordinator/consumer.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) (limited to 'kafka/coordinator/consumer.py') diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3ce7570..cd3d48a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,9 +14,7 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol from .. import errors as Errors from ..future import Future -from ..protocol.commit import ( - OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, - OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod @@ -430,11 +428,11 @@ class ConsumerCoordinator(BaseCoordinator): offset_data[tp.topic][tp.partition] = offset if self.config['api_version'] >= (0, 9): - request = OffsetCommitRequest_v2( + request = OffsetCommitRequest[2]( self.group_id, self.generation, self.member_id, - OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME, + OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, [( topic, [( partition, @@ -444,7 +442,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 2): - request = OffsetCommitRequest_v1( + request = OffsetCommitRequest[1]( self.group_id, -1, '', [( topic, [( @@ -456,7 +454,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 1): - request = OffsetCommitRequest_v0( + request = OffsetCommitRequest[0]( self.group_id, [( topic, [( @@ -593,12 +591,12 @@ class ConsumerCoordinator(BaseCoordinator): topic_partitions[tp.topic].add(tp.partition) if self.config['api_version'] >= (0, 8, 2): - request = OffsetFetchRequest_v1( + request = OffsetFetchRequest[1]( self.group_id, list(topic_partitions.items()) ) else: - request = OffsetFetchRequest_v0( + request = OffsetFetchRequest[0]( self.group_id, list(topic_partitions.items()) ) -- cgit v1.2.1