summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-03-23 05:56:11 -0700
committerGitHub <noreply@github.com>2018-03-23 05:56:11 -0700
commit204388b0928c02a339eb84b376c74851eb074e69 (patch)
tree01e21b29552f161a0a32eda00119e617bb8234b6
parente8cb888629210b3c26748a5e2e61ab5df7b95933 (diff)
downloadkafka-python-204388b0928c02a339eb84b376c74851eb074e69.tar.gz
Check for immediate failure when looking up coordinator in heartbeat thread (#1457)
-rw-r--r--kafka/coordinator/base.py6
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index bff6286..9f67d6b 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -945,7 +945,11 @@ class HeartbeatThread(threading.Thread):
self.coordinator._client.poll(timeout_ms=0)
if self.coordinator.coordinator_unknown():
- if not self.coordinator.lookup_coordinator().is_done:
+ future = self.coordinator.lookup_coordinator()
+ if not future.is_done or future.failed():
+ # the immediate future check ensures that we backoff
+ # properly in the case that no brokers are available
+ # to connect to (and the future is automatically failed).
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
elif self.coordinator.heartbeat.session_timeout_expired():