diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-24 11:22:25 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-24 11:22:25 -0700 |
commit | 6564b3bb2f400aae5e411c88cd973519a0aafc50 (patch) | |
tree | 510960c488a14f474317d583010aa8eec7f66cf0 | |
parent | e06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff) | |
download | kafka-python-6564b3bb2f400aae5e411c88cd973519a0aafc50.tar.gz |
Handle future bug when sending group coordinator request fails immediately
-rw-r--r-- | kafka/coordinator/base.py | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 53b3e1d..6e07211 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -228,11 +228,16 @@ class BaseCoordinator(object): self._find_coordinator_future = None def lookup_coordinator(self): - if self._find_coordinator_future is None: - self._find_coordinator_future = self._send_group_coordinator_request() - - self._find_coordinator_future.add_both(self._reset_find_coordinator_future) - return self._find_coordinator_future + if self._find_coordinator_future is not None: + return self._find_coordinator_future + + # If there is an error sending the group coordinator request + # then the errback will immediately fire and reset _find_coordinator_future = None + # To avoid the race, we capture the future in a local variable and return it + self._find_coordinator_future = self._send_group_coordinator_request() + future = self._find_coordinator_future + self._find_coordinator_future.add_both(self._reset_find_coordinator_future) + return future def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) |