summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-24 11:22:25 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-24 11:22:25 -0700
commit6564b3bb2f400aae5e411c88cd973519a0aafc50 (patch)
tree510960c488a14f474317d583010aa8eec7f66cf0
parente06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff)
downloadkafka-python-6564b3bb2f400aae5e411c88cd973519a0aafc50.tar.gz
Handle future bug when sending group coordinator request fails immediately
-rw-r--r--kafka/coordinator/base.py15
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 53b3e1d..6e07211 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -228,11 +228,16 @@ class BaseCoordinator(object):
self._find_coordinator_future = None
def lookup_coordinator(self):
- if self._find_coordinator_future is None:
- self._find_coordinator_future = self._send_group_coordinator_request()
-
- self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
- return self._find_coordinator_future
+ if self._find_coordinator_future is not None:
+ return self._find_coordinator_future
+
+ # If there is an error sending the group coordinator request
+ # then the errback will immediately fire and reset _find_coordinator_future = None
+ # To avoid the race, we capture the future in a local variable and return it
+ self._find_coordinator_future = self._send_group_coordinator_request()
+ future = self._find_coordinator_future
+ self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
+ return future
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)