summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-05-28 23:45:48 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-05-29 21:46:37 -0700
commit9f0b518286ecfc6db8b7abbd2431810c16f1cc80 (patch)
tree81f210c3d6de3d21b5a00d8dded3d8128b3fdd2e
parent5bb1abd3495ce81a0522b2a66e6c5d2731dae77b (diff)
downloadkafka-python-9f0b518286ecfc6db8b7abbd2431810c16f1cc80.tar.gz
Reduce client poll timeout when no ifrs
-rw-r--r--kafka/client_async.py3
-rw-r--r--test/test_client_async.py12
2 files changed, 15 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 77efac8..42ec42b 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -588,6 +588,9 @@ class KafkaClient(object):
metadata_timeout_ms,
idle_connection_timeout_ms,
self.config['request_timeout_ms'])
+ # if there are no requests in flight, do not block longer than the retry backoff
+ if self.in_flight_request_count() == 0:
+ timeout = min(timeout, self.config['retry_backoff_ms'])
timeout = max(0, timeout / 1000) # avoid negative timeouts
self._poll(timeout)
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 77f6b6b..82d1467 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -229,6 +229,8 @@ def test_send(cli, conn):
def test_poll(mocker):
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
+ ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count')
+ ifrs.return_value = 1
cli = KafkaClient(api_version=(0, 9))
# metadata timeout wins
@@ -245,6 +247,11 @@ def test_poll(mocker):
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
+ # If no in-flight-requests, drop timeout to retry_backoff_ms
+ ifrs.return_value = 0
+ cli.poll()
+ _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0)
+
def test__poll():
pass
@@ -300,12 +307,14 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(1.234)
def test_maybe_refresh_metadata_backoff(mocker, client):
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
@@ -316,6 +325,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # request_timeout_ms
@@ -324,6 +334,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client):
def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_send_request', return_value=True)
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
send = mocker.patch.object(client, 'send')
client.poll(timeout_ms=12345678)
@@ -338,6 +349,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)
mocker.patch.object(client, 'maybe_connect', return_value=True)
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
now = time.time()
t = mocker.patch('time.time')