summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-03-09 14:26:09 -0800
committerGitHub <noreply@github.com>2017-03-09 14:26:09 -0800
commitbb709f4c141dacee07248eb111fa48c3992cf2f9 (patch)
treeb10f37d62e1e7146b1c29517e8457d707ebd7d0f
parent6ef7675ba0757fafc136c6b18db8351ddc5a70b8 (diff)
downloadkafka-python-bb709f4c141dacee07248eb111fa48c3992cf2f9.tar.gz
Short-circuit group coordinator requests when NodeNotReady (#995)
-rw-r--r--kafka/coordinator/base.py23
1 files changed, 23 insertions, 0 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 68b1bda..ab259dd 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -286,6 +286,10 @@ class BaseCoordinator(object):
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)
+ elif not self._client.ready(self.coordinator_id, metadata_priority=False):
+ e = Errors.NodeNotReadyError(self.coordinator_id)
+ return Future().failure(e)
+
# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
request = JoinGroupRequest[0](
@@ -416,6 +420,13 @@ class BaseCoordinator(object):
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)
+
+ # We assume that coordinator is ready if we're sending SyncGroup
+ # as it typically follows a successful JoinGroup
+ # Also note that if client.ready() enforces a metadata priority policy,
+ # we can get into an infinite loop if the leader assignment process
+ # itself requests a metadata update
+
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_sync_group_response, future, time.time())
@@ -467,6 +478,10 @@ class BaseCoordinator(object):
if node_id is None:
return Future().failure(Errors.NoBrokersAvailable())
+ elif not self._client.ready(node_id, metadata_priority=False):
+ e = Errors.NodeNotReadyError(node_id)
+ return Future().failure(e)
+
log.debug("Sending group coordinator request for group %s to broker %s",
self.group_id, node_id)
request = GroupCoordinatorRequest[0](self.group_id)
@@ -553,6 +568,14 @@ class BaseCoordinator(object):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
+ if self.coordinator_unknown():
+ e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
+ return Future().failure(e)
+
+ elif not self._client.ready(self.coordinator_id, metadata_priority=False):
+ e = Errors.NodeNotReadyError(self.coordinator_id)
+ return Future().failure(e)
+
request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()