summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-20 09:23:07 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-20 09:56:07 -0700
commit06abc63522a32ddf15a9de9c168132d85326cc0d (patch)
tree85e21b8e2796d61f9b1c62ae829aad1926706c88
parent461ccbd9ecf06722c9ff73f6ed439be4b8391672 (diff)
downloadkafka-python-metadata_refresh_backoff.tar.gz
Avoid busy poll during metadata refresh failure with retry_backoff_msmetadata_refresh_backoff
-rw-r--r--kafka/client_async.py73
-rw-r--r--kafka/cluster.py4
-rw-r--r--test/test_client_async.py102
3 files changed, 147 insertions, 32 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 8916a3e..25952be 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -126,6 +126,7 @@ class KafkaClient(object):
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
+ self._last_no_node_available_ms = 0
self._selector = selectors.DefaultSelector()
self._conns = {}
self._connecting = set()
@@ -600,38 +601,50 @@ class KafkaClient(object):
int: milliseconds until next refresh
"""
ttl = self.cluster.ttl()
- if ttl > 0:
- return ttl
+ next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
+ next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
+ wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
+ timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)
+
+ if timeout == 0:
+ node_id = self.least_loaded_node()
+ if node_id is None:
+ log.debug("Give up sending metadata request since no node is available")
+ # mark the timestamp for no node available to connect
+ self._last_no_node_available_ms = time.time() * 1000
+ return timeout
+
+ topics = list(self._topics)
+ if self.cluster.need_all_topic_metadata:
+ topics = []
- if self._metadata_refresh_in_progress:
- return 9999999999
-
- node_id = self.least_loaded_node()
- if node_id is None:
- return 0
-
- topics = list(self._topics)
- if self.cluster.need_all_topic_metadata:
- topics = []
-
- if self._can_send_request(node_id):
- request = MetadataRequest[0](topics)
- log.debug("Sending metadata request %s to node %s", request, node_id)
- future = self.send(node_id, request)
- future.add_callback(self.cluster.update_metadata)
- future.add_errback(self.cluster.failed_update)
-
- self._metadata_refresh_in_progress = True
- def refresh_done(val_or_error):
- self._metadata_refresh_in_progress = False
- future.add_callback(refresh_done)
- future.add_errback(refresh_done)
-
- elif self._can_connect(node_id):
- log.debug("Initializing connection to node %s for metadata request", node_id)
- self._maybe_connect(node_id)
+ if self._can_send_request(node_id):
+ request = MetadataRequest[0](topics)
+ log.debug("Sending metadata request %s to node %s", request, node_id)
+ future = self.send(node_id, request)
+ future.add_callback(self.cluster.update_metadata)
+ future.add_errback(self.cluster.failed_update)
+
+ self._metadata_refresh_in_progress = True
+ def refresh_done(val_or_error):
+ self._metadata_refresh_in_progress = False
+ future.add_callback(refresh_done)
+ future.add_errback(refresh_done)
+
+ elif self._can_connect(node_id):
+ log.debug("Initializing connection to node %s for metadata request", node_id)
+ self._maybe_connect(node_id)
+ # If initiateConnect failed immediately, this node will be put into blackout and we
+ # should allow immediately retrying in case there is another candidate node. If it
+ # is still connecting, the worst case is that we end up setting a longer timeout
+ # on the next round and then wait for the response.
+ else:
+ # connected, but can't send more OR connecting
+ # In either case, we just need to wait for a network event to let us know the selected
+ # connection might be usable again.
+ self._last_no_node_available_ms = time.time() * 1000
- return 0
+ return timeout
def schedule(self, task, at):
"""Schedule a new task to be executed at the given time.
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 3309d1f..9aabec1 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -131,6 +131,10 @@ class ClusterMetadata(object):
return max(ttl, next_retry, 0)
+ def refresh_backoff(self):
+ """Return milliseconds to wait before attempting to retry after failure"""
+ return self.config['retry_backoff_ms']
+
def request_update(self):
"""Flags metadata for update, return Future()
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 5870501..06c2bf5 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -293,8 +293,106 @@ def test_set_topics():
pass
-def test_maybe_refresh_metadata():
- pass
+def test_maybe_refresh_metadata_ttl(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 1234
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(1.234, sleep=True)
+
+
+def test_maybe_refresh_metadata_backoff(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ now = time.time()
+ t = mocker.patch('time.time')
+ t.return_value = now
+ cli._last_no_node_available_ms = now * 1000
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(2.222, sleep=True)
+
+
+def test_maybe_refresh_metadata_in_progress(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ cli._metadata_refresh_in_progress = True
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(9999.999, sleep=True)
+
+
+def test_maybe_refresh_metadata_update(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
+ mocker.patch.object(cli, '_can_send_request', return_value=True)
+ send = mocker.patch.object(cli, 'send')
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(0, sleep=True)
+ assert cli._metadata_refresh_in_progress
+ request = MetadataRequest[0]([])
+ send.assert_called_with('foobar', request)
+
+
+def test_maybe_refresh_metadata_failure(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
+
+ now = time.time()
+ t = mocker.patch('time.time')
+ t.return_value = now
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(0, sleep=True)
+ assert cli._last_no_node_available_ms == now * 1000
+ assert not cli._metadata_refresh_in_progress
def test_schedule():