diff options
| author | Dana Powers <dana.powers@gmail.com> | 2016-03-11 16:41:42 -0800 |
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2016-03-11 16:41:42 -0800 |
| commit | f1b9b8490a1341def9cabd948357e6c0afd0e9d3 (patch) | |
| tree | 7006115c062fa3a937e9a6df09f9362bdb2b94f8 | |
| parent | af2cd5e91aee77a3fd292da00ee04358026557f0 (diff) | |
| download | kafka-python-f1b9b8490a1341def9cabd948357e6c0afd0e9d3.tar.gz | |
HeartbeatTask should reschedule heartbeat on coordinator_unknown()
| -rw-r--r-- | kafka/coordinator/base.py | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c49c38b..a2c47a4 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -594,14 +594,18 @@ class HeartbeatTask(object): def __call__(self): if (self._coordinator.generation < 0 or - self._coordinator.need_rejoin() or - self._coordinator.coordinator_unknown()): + self._coordinator.need_rejoin()): # no need to send the heartbeat we're not using auto-assignment # or if we are awaiting a rebalance log.debug("Skipping heartbeat: no auto-assignment" " or waiting on rebalance") return + if self._coordinator.coordinator_unknown(): + log.warning("Coordinator unknown during heartbeat -- will retry") + self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError()) + return + if self._heartbeat.session_expired(): # we haven't received a successful heartbeat in one session interval # so mark the coordinator dead |
