summaryrefslogtreecommitdiff
path: root/kafka/coordinator
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 22:54:28 -0700
commit90c729438a2e3f1b194e58231e41bd16bd7b7172 (patch)
treeb22cef6b10fd167fb22b8318e1294f6137427f3b /kafka/coordinator
parent452e7c2190b83f320f58e7f650302696dde458ed (diff)
downloadkafka-python-90c729438a2e3f1b194e58231e41bd16bd7b7172.tar.gz
Use version-indexed lists for request/response protocol structsprotocol_versions
Diffstat (limited to 'kafka/coordinator')
-rw-r--r--kafka/coordinator/base.py29
-rw-r--r--kafka/coordinator/consumer.py16
2 files changed, 21 insertions, 24 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 3c7ea21..7ff7a04 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -8,8 +8,7 @@ import six
import kafka.errors as Errors
from kafka.future import Future
-from kafka.protocol.commit import (GroupCoordinatorRequest,
- OffsetCommitRequest_v2 as OffsetCommitRequest)
+from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
LeaveGroupRequest, SyncGroupRequest)
from .heartbeat import Heartbeat
@@ -79,8 +78,8 @@ class BaseCoordinator(object):
self.config[key] = configs[key]
self._client = client
- self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
+ self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.group_id = self.config['group_id']
self.coordinator_id = None
self.rejoin_needed = True
@@ -269,7 +268,7 @@ class BaseCoordinator(object):
# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
- request = JoinGroupRequest(
+ request = JoinGroupRequest[0](
self.group_id,
self.config['session_timeout_ms'],
self.member_id,
@@ -324,7 +323,7 @@ class BaseCoordinator(object):
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
error = error_type(self.member_id)
- self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
log.debug("Attempt to join group %s failed due to unknown member id",
self.group_id)
future.failure(error)
@@ -354,7 +353,7 @@ class BaseCoordinator(object):
def _on_join_follower(self):
# send follower's sync group with an empty assignment
- request = SyncGroupRequest(
+ request = SyncGroupRequest[0](
self.group_id,
self.generation,
self.member_id,
@@ -381,7 +380,7 @@ class BaseCoordinator(object):
except Exception as e:
return Future().failure(e)
- request = SyncGroupRequest(
+ request = SyncGroupRequest[0](
self.group_id,
self.generation,
self.member_id,
@@ -425,7 +424,7 @@ class BaseCoordinator(object):
Errors.IllegalGenerationError):
error = error_type()
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
- self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
@@ -450,7 +449,7 @@ class BaseCoordinator(object):
log.debug("Sending group coordinator request for group %s to broker %s",
self.group_id, node_id)
- request = GroupCoordinatorRequest(self.group_id)
+ request = GroupCoordinatorRequest[0](self.group_id)
future = Future()
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_group_coordinator_response, future)
@@ -514,14 +513,14 @@ class BaseCoordinator(object):
if not self.coordinator_unknown() and self.generation > 0:
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
- request = LeaveGroupRequest(self.group_id, self.member_id)
+ request = LeaveGroupRequest[0](self.group_id, self.member_id)
future = self._client.send(self.coordinator_id, request)
future.add_callback(self._handle_leave_group_response)
future.add_errback(log.error, "LeaveGroup request failed: %s")
self._client.poll(future=future)
- self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
- self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID
+ self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.rejoin_needed = True
def _handle_leave_group_response(self, response):
@@ -533,7 +532,7 @@ class BaseCoordinator(object):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
- request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
+ request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
@@ -569,7 +568,7 @@ class BaseCoordinator(object):
elif error_type is Errors.UnknownMemberIdError:
log.warning("Heartbeat: local member_id was not recognized;"
" this consumer needs to re-join")
- self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.rejoin_needed = True
future.failure(error_type)
elif error_type is Errors.GroupAuthorizationFailedError:
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 3ce7570..cd3d48a 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -14,9 +14,7 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
from .. import errors as Errors
from ..future import Future
-from ..protocol.commit import (
- OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
- OffsetFetchRequest_v0, OffsetFetchRequest_v1)
+from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
@@ -430,11 +428,11 @@ class ConsumerCoordinator(BaseCoordinator):
offset_data[tp.topic][tp.partition] = offset
if self.config['api_version'] >= (0, 9):
- request = OffsetCommitRequest_v2(
+ request = OffsetCommitRequest[2](
self.group_id,
self.generation,
self.member_id,
- OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
+ OffsetCommitRequest[2].DEFAULT_RETENTION_TIME,
[(
topic, [(
partition,
@@ -444,7 +442,7 @@ class ConsumerCoordinator(BaseCoordinator):
) for topic, partitions in six.iteritems(offset_data)]
)
elif self.config['api_version'] >= (0, 8, 2):
- request = OffsetCommitRequest_v1(
+ request = OffsetCommitRequest[1](
self.group_id, -1, '',
[(
topic, [(
@@ -456,7 +454,7 @@ class ConsumerCoordinator(BaseCoordinator):
) for topic, partitions in six.iteritems(offset_data)]
)
elif self.config['api_version'] >= (0, 8, 1):
- request = OffsetCommitRequest_v0(
+ request = OffsetCommitRequest[0](
self.group_id,
[(
topic, [(
@@ -593,12 +591,12 @@ class ConsumerCoordinator(BaseCoordinator):
topic_partitions[tp.topic].add(tp.partition)
if self.config['api_version'] >= (0, 8, 2):
- request = OffsetFetchRequest_v1(
+ request = OffsetFetchRequest[1](
self.group_id,
list(topic_partitions.items())
)
else:
- request = OffsetFetchRequest_v0(
+ request = OffsetFetchRequest[0](
self.group_id,
list(topic_partitions.items())
)