diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-05-28 23:45:48 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-05-29 21:46:37 -0700 |
commit | 9f0b518286ecfc6db8b7abbd2431810c16f1cc80 (patch) | |
tree | 81f210c3d6de3d21b5a00d8dded3d8128b3fdd2e | |
parent | 5bb1abd3495ce81a0522b2a66e6c5d2731dae77b (diff) | |
download | kafka-python-9f0b518286ecfc6db8b7abbd2431810c16f1cc80.tar.gz |
Reduce client poll timeout when no ifrs
-rw-r--r-- | kafka/client_async.py | 3 | ||||
-rw-r--r-- | test/test_client_async.py | 12 |
2 files changed, 15 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 77efac8..42ec42b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -588,6 +588,9 @@ class KafkaClient(object): metadata_timeout_ms, idle_connection_timeout_ms, self.config['request_timeout_ms']) + # if there are no requests in flight, do not block longer than the retry backoff + if self.in_flight_request_count() == 0: + timeout = min(timeout, self.config['retry_backoff_ms']) timeout = max(0, timeout / 1000) # avoid negative timeouts self._poll(timeout) diff --git a/test/test_client_async.py b/test/test_client_async.py index 77f6b6b..82d1467 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -229,6 +229,8 @@ def test_send(cli, conn): def test_poll(mocker): metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') _poll = mocker.patch.object(KafkaClient, '_poll') + ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count') + ifrs.return_value = 1 cli = KafkaClient(api_version=(0, 9)) # metadata timeout wins @@ -245,6 +247,11 @@ def test_poll(mocker): cli.poll() _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) + # If no in-flight-requests, drop timeout to retry_backoff_ms + ifrs.return_value = 0 + cli.poll() + _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0) + def test__poll(): pass @@ -300,12 +307,14 @@ def client(mocker): def test_maybe_refresh_metadata_ttl(mocker, client): client.cluster.ttl.return_value = 1234 + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(1.234) def test_maybe_refresh_metadata_backoff(mocker, client): + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) now = time.time() t = mocker.patch('time.time') t.return_value = now @@ -316,6 +325,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client): def test_maybe_refresh_metadata_in_progress(mocker, client): client._metadata_refresh_in_progress = True + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(9999.999) # request_timeout_ms @@ -324,6 +334,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client): def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=True) + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) send = mocker.patch.object(client, 'send') client.poll(timeout_ms=12345678) @@ -338,6 +349,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) mocker.patch.object(client, 'maybe_connect', return_value=True) + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) now = time.time() t = mocker.patch('time.time') |