summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-04-05 23:13:02 -0700
committerGitHub <noreply@github.com>2017-04-05 23:13:02 -0700
commit7c24135eaf1db95c50c5d340cd15cbfc2674c927 (patch)
tree79b9e2ce24760dd6f4d5fe548bd6b9d638371d6c
parentbb9642f04c25b925b7b24f36540bd66059d4c424 (diff)
downloadkafka-python-7c24135eaf1db95c50c5d340cd15cbfc2674c927.tar.gz
Avoid multiple connection attempts when refreshing metadata (#1067)
-rw-r--r--kafka/client_async.py92
-rw-r--r--test/test_client_async.py39
2 files changed, 73 insertions, 58 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index fbeb775..16ebb99 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -183,7 +183,6 @@ class KafkaClient(object):
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
- self._last_no_node_available_ms = 0
self._selector = self.config['selector']()
self._conns = {}
self._connecting = set()
@@ -709,50 +708,55 @@ class KafkaClient(object):
int: milliseconds until next refresh
"""
ttl = self.cluster.ttl()
- next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff()
- next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0)
- wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0
- timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms)
-
- if timeout == 0:
- node_id = self.least_loaded_node()
- if node_id is None:
- log.debug("Give up sending metadata request since no node is available")
- # mark the timestamp for no node available to connect
- self._last_no_node_available_ms = time.time() * 1000
- return timeout
-
- if self._can_send_request(node_id):
- topics = list(self._topics)
- if self.cluster.need_all_topic_metadata or not topics:
- topics = [] if self.config['api_version'] < (0, 10) else None
- api_version = 0 if self.config['api_version'] < (0, 10) else 1
- request = MetadataRequest[api_version](topics)
- log.debug("Sending metadata request %s to node %s", request, node_id)
- future = self.send(node_id, request)
- future.add_callback(self.cluster.update_metadata)
- future.add_errback(self.cluster.failed_update)
-
- self._metadata_refresh_in_progress = True
- def refresh_done(val_or_error):
- self._metadata_refresh_in_progress = False
- future.add_callback(refresh_done)
- future.add_errback(refresh_done)
-
- elif self._can_connect(node_id):
- log.debug("Initializing connection to node %s for metadata request", node_id)
- self._maybe_connect(node_id)
- # If _maybe_connect failed immediately, this node will be put into blackout and we
- # should allow immediately retrying in case there is another candidate node. If it
- # is still connecting, the worst case is that we end up setting a longer timeout
- # on the next round and then wait for the response.
- else:
- # connected, but can't send more OR connecting
- # In either case, we just need to wait for a network event to let us know the selected
- # connection might be usable again.
- self._last_no_node_available_ms = time.time() * 1000
+ wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
+ metadata_timeout = max(ttl, wait_for_in_progress_ms)
- return timeout
+ if metadata_timeout > 0:
+ return metadata_timeout
+
+ # Beware that the behavior of this method and the computation of
+ # timeouts for poll() are highly dependent on the behavior of
+ # least_loaded_node()
+ node_id = self.least_loaded_node()
+ if node_id is None:
+ log.debug("Give up sending metadata request since no node is available");
+ return self.config['reconnect_backoff_ms']
+
+ if self._can_send_request(node_id):
+ topics = list(self._topics)
+ if self.cluster.need_all_topic_metadata or not topics:
+ topics = [] if self.config['api_version'] < (0, 10) else None
+ api_version = 0 if self.config['api_version'] < (0, 10) else 1
+ request = MetadataRequest[api_version](topics)
+ log.debug("Sending metadata request %s to node %s", request, node_id)
+ future = self.send(node_id, request)
+ future.add_callback(self.cluster.update_metadata)
+ future.add_errback(self.cluster.failed_update)
+
+ self._metadata_refresh_in_progress = True
+ def refresh_done(val_or_error):
+ self._metadata_refresh_in_progress = False
+ future.add_callback(refresh_done)
+ future.add_errback(refresh_done)
+ return self.config['request_timeout_ms']
+
+ # If there's any connection establishment underway, wait until it completes. This prevents
+ # the client from unnecessarily connecting to additional nodes while a previous connection
+ # attempt has not been completed.
+ if self._connecting:
+ # Strictly the timeout we should return here is "connect timeout", but as we don't
+ # have such application level configuration, using request timeout instead.
+ return self.config['request_timeout_ms']
+
+ if self._can_connect(node_id):
+ log.debug("Initializing connection to node %s for metadata request", node_id)
+ self._maybe_connect(node_id)
+ return self.config['reconnect_backoff_ms']
+
+ # connected but can't send more, OR connecting
+ # In either case we just need to wait for a network event
+ # to let us know the selected connection might be usable again.
+ return float('inf')
def schedule(self, task, at):
"""Schedule a new task to be executed at the given time.
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 97be827..8f6ac3f 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -319,7 +319,7 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
- cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9))
+ cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
@@ -332,7 +332,7 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234
- client.poll(timeout_ms=9999999, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(1.234, sleep=True)
@@ -340,17 +340,16 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
- client._last_no_node_available_ms = now * 1000
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(2.222, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(9999.999, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
def test_maybe_refresh_metadata_update(mocker, client):
@@ -358,23 +357,35 @@ def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, '_can_send_request', return_value=True)
send = mocker.patch.object(client, 'send')
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(0, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
- send.assert_called_with('foobar', request)
+ send.assert_called_once_with('foobar', request)
-def test_maybe_refresh_metadata_failure(mocker, client):
+def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
+ mocker.patch.object(client, '_can_connect', return_value=True)
+ mocker.patch.object(client, '_maybe_connect', return_value=True)
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(0, sleep=True)
- assert client._last_no_node_available_ms == now * 1000
+ # first poll attempts connection
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
+ client._can_connect.assert_called_once_with('foobar')
+ client._maybe_connect.assert_called_once_with('foobar')
+
+ # poll while connecting should not attempt a new connection
+ client._connecting.add('foobar')
+ client._can_connect.reset_mock()
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
+ assert not client._can_connect.called
+
assert not client._metadata_refresh_in_progress