diff options
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 2 | ||||
-rw-r--r-- | test/test_client_async.py | 8 |
4 files changed, 7 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ca51987..cfc89fc 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -313,7 +313,7 @@ class KafkaClient(object): return self._conns[node_id].send(request, expect_response=expect_response) - def poll(self, timeout_ms=None, future=None, sleep=False): + def poll(self, timeout_ms=None, future=None, sleep=True): """Try to read and write to sockets. This method will also attempt to complete node connections, refresh diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6c85c21..151e644 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -743,7 +743,7 @@ class KafkaConsumer(six.Iterator): poll_ms = 1000 * (self._consumer_timeout - time.time()) if not self._fetcher.in_flight_fetches(): poll_ms = 0 - self._client.poll(poll_ms) + self._client.poll(timeout_ms=poll_ms, sleep=True) # We need to make sure we at least keep up with scheduled tasks, # like heartbeats, auto-commits, and metadata refreshes diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 348ee4e..c75eb7c 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -202,7 +202,7 @@ class BaseCoordinator(object): continue elif future.retriable(): metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update, sleep=True) + self._client.poll(future=metadata_update) else: raise future.exception # pylint: disable-msg=raising-bad-type diff --git a/test/test_client_async.py b/test/test_client_async.py index 88f0fc7..46400b8 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -235,23 +235,23 @@ def test_poll(mocker): metadata.return_value = 1000 tasks.return_value = 2 cli.poll() - _poll.assert_called_with(1.0, sleep=False) + _poll.assert_called_with(1.0, sleep=True) # user timeout wins cli.poll(250) - _poll.assert_called_with(0.25, sleep=False) + _poll.assert_called_with(0.25, sleep=True) # tasks timeout wins tasks.return_value = 0 cli.poll(250) - _poll.assert_called_with(0, sleep=False) + _poll.assert_called_with(0, sleep=True) # default is request_timeout_ms metadata.return_value = 1000000 tasks.return_value = 10000 cli.poll() _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0, - sleep=False) + sleep=True) def test__poll(): |