diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 22:54:28 -0700 |
commit | 90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch) | |
tree | b22cef6b10fd167fb22b8318e1294f6137427f3b /kafka/coordinator | |
parent | 452e7c2190b83f320f58e7f650302696dde458ed (diff) | |
download | kafka-python-90c729438a2e3f1b194e58231e41bd16bd7b7172.tar.gz |
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'kafka/coordinator')
-rw-r--r-- | kafka/coordinator/base.py | 29 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 16 |
2 files changed, 21 insertions, 24 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 3c7ea21..7ff7a04 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -8,8 +8,7 @@ import six import kafka.errors as Errors from kafka.future import Future -from kafka.protocol.commit import (GroupCoordinatorRequest, - OffsetCommitRequest_v2 as OffsetCommitRequest) +from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) from .heartbeat import Heartbeat @@ -79,8 +78,8 @@ class BaseCoordinator(object): self.config[key] = configs[key] self._client = client - self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = self.config['group_id'] self.coordinator_id = None self.rejoin_needed = True @@ -269,7 +268,7 @@ class BaseCoordinator(object): # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) - request = JoinGroupRequest( + request = JoinGroupRequest[0]( self.group_id, self.config['session_timeout_ms'], self.member_id, @@ -324,7 +323,7 @@ class BaseCoordinator(object): elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self.member_id) - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID log.debug("Attempt to join group %s failed due to unknown member id", self.group_id) future.failure(error) @@ -354,7 +353,7 @@ class BaseCoordinator(object): def _on_join_follower(self): # send follower's sync group with an empty assignment - request = SyncGroupRequest( + request = SyncGroupRequest[0]( self.group_id, self.generation, self.member_id, @@ -381,7 +380,7 @@ class BaseCoordinator(object): except Exception as e: return Future().failure(e) - request = SyncGroupRequest( + request = SyncGroupRequest[0]( self.group_id, self.generation, self.member_id, @@ -425,7 +424,7 @@ class BaseCoordinator(object): Errors.IllegalGenerationError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID future.failure(error) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): @@ -450,7 +449,7 @@ class BaseCoordinator(object): log.debug("Sending group coordinator request for group %s to broker %s", self.group_id, node_id) - request = GroupCoordinatorRequest(self.group_id) + request = GroupCoordinatorRequest[0](self.group_id) future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_group_coordinator_response, future) @@ -514,14 +513,14 @@ class BaseCoordinator(object): if not self.coordinator_unknown() and self.generation > 0: # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. - request = LeaveGroupRequest(self.group_id, self.member_id) + request = LeaveGroupRequest[0](self.group_id, self.member_id) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") self._client.poll(future=future) - self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.rejoin_needed = True def _handle_leave_group_response(self, response): @@ -533,7 +532,7 @@ class BaseCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" - request = HeartbeatRequest(self.group_id, self.generation, self.member_id) + request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) @@ -569,7 +568,7 @@ class BaseCoordinator(object): elif error_type is Errors.UnknownMemberIdError: log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.rejoin_needed = True future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3ce7570..cd3d48a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,9 +14,7 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol from .. import errors as Errors from ..future import Future -from ..protocol.commit import ( - OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, - OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod @@ -430,11 +428,11 @@ class ConsumerCoordinator(BaseCoordinator): offset_data[tp.topic][tp.partition] = offset if self.config['api_version'] >= (0, 9): - request = OffsetCommitRequest_v2( + request = OffsetCommitRequest[2]( self.group_id, self.generation, self.member_id, - OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME, + OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, [( topic, [( partition, @@ -444,7 +442,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 2): - request = OffsetCommitRequest_v1( + request = OffsetCommitRequest[1]( self.group_id, -1, '', [( topic, [( @@ -456,7 +454,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 1): - request = OffsetCommitRequest_v0( + request = OffsetCommitRequest[0]( self.group_id, [( topic, [( @@ -593,12 +591,12 @@ class ConsumerCoordinator(BaseCoordinator): topic_partitions[tp.topic].add(tp.partition) if self.config['api_version'] >= (0, 8, 2): - request = OffsetFetchRequest_v1( + request = OffsetFetchRequest[1]( self.group_id, list(topic_partitions.items()) ) else: - request = OffsetFetchRequest_v0( + request = OffsetFetchRequest[0]( self.group_id, list(topic_partitions.items()) ) |