summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-11 16:41:42 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-11 16:41:42 -0800
commitf1b9b8490a1341def9cabd948357e6c0afd0e9d3 (patch)
tree7006115c062fa3a937e9a6df09f9362bdb2b94f8
parentaf2cd5e91aee77a3fd292da00ee04358026557f0 (diff)
downloadkafka-python-f1b9b8490a1341def9cabd948357e6c0afd0e9d3.tar.gz
HeartbeatTask should reschedule heartbeat on coordinator_unknown()
-rw-r--r--kafka/coordinator/base.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index c49c38b..a2c47a4 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -594,14 +594,18 @@ class HeartbeatTask(object):
def __call__(self):
if (self._coordinator.generation < 0 or
- self._coordinator.need_rejoin() or
- self._coordinator.coordinator_unknown()):
+ self._coordinator.need_rejoin()):
# no need to send the heartbeat we're not using auto-assignment
# or if we are awaiting a rebalance
log.debug("Skipping heartbeat: no auto-assignment"
" or waiting on rebalance")
return
+ if self._coordinator.coordinator_unknown():
+ log.warning("Coordinator unknown during heartbeat -- will retry")
+ self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
+ return
+
if self._heartbeat.session_expired():
# we haven't received a successful heartbeat in one session interval
# so mark the coordinator dead