summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-24 16:05:50 -0700
committerGitHub <noreply@github.com>2017-10-24 16:05:50 -0700
commitd2001e4b69c2b03202a44899b687b05e735261a8 (patch)
tree5662b887c203a843d4b0484aed32fb2bb59423c1
parent8b05ee8da50b4c7b832676f4e38f9d92a86639cc (diff)
downloadkafka-python-d2001e4b69c2b03202a44899b687b05e735261a8.tar.gz
Handle lookup_coordinator send failures (#1279)
-rw-r--r--kafka/coordinator/base.py16
-rw-r--r--test/test_coordinator.py8
2 files changed, 19 insertions, 5 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 53b3e1d..a3055da 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -228,11 +228,17 @@ class BaseCoordinator(object):
self._find_coordinator_future = None
def lookup_coordinator(self):
- if self._find_coordinator_future is None:
- self._find_coordinator_future = self._send_group_coordinator_request()
-
- self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
- return self._find_coordinator_future
+ if self._find_coordinator_future is not None:
+ return self._find_coordinator_future
+
+ # If there is an error sending the group coordinator request
+ # then _reset_find_coordinator_future will immediately fire and
+ # set _find_coordinator_future = None
+ # To avoid returning None, we capture the future in a local variable
+ self._find_coordinator_future = self._send_group_coordinator_request()
+ future = self._find_coordinator_future
+ self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
+ return future
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index aea2662..0e96110 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -590,3 +590,11 @@ def test_heartbeat(patched_coord):
patched_coord.heartbeat_task()
assert patched_coord._client.schedule.call_count == 1
assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1
+
+
+def test_lookup_coordinator_failure(mocker, coordinator):
+
+ mocker.patch.object(coordinator, '_send_group_coordinator_request',
+ return_value=Future().failure(Exception('foobar')))
+ future = coordinator.lookup_coordinator()
+ assert future.failed()