summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--kafka/coordinator/base.py2
-rw-r--r--test/test_client_async.py8
4 files changed, 7 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ca51987..cfc89fc 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -313,7 +313,7 @@ class KafkaClient(object):
return self._conns[node_id].send(request, expect_response=expect_response)
- def poll(self, timeout_ms=None, future=None, sleep=False):
+ def poll(self, timeout_ms=None, future=None, sleep=True):
"""Try to read and write to sockets.
This method will also attempt to complete node connections, refresh
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 6c85c21..151e644 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -743,7 +743,7 @@ class KafkaConsumer(six.Iterator):
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
- self._client.poll(poll_ms)
+ self._client.poll(timeout_ms=poll_ms, sleep=True)
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 348ee4e..c75eb7c 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -202,7 +202,7 @@ class BaseCoordinator(object):
continue
elif future.retriable():
metadata_update = self._client.cluster.request_update()
- self._client.poll(future=metadata_update, sleep=True)
+ self._client.poll(future=metadata_update)
else:
raise future.exception # pylint: disable-msg=raising-bad-type
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 88f0fc7..46400b8 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -235,23 +235,23 @@ def test_poll(mocker):
metadata.return_value = 1000
tasks.return_value = 2
cli.poll()
- _poll.assert_called_with(1.0, sleep=False)
+ _poll.assert_called_with(1.0, sleep=True)
# user timeout wins
cli.poll(250)
- _poll.assert_called_with(0.25, sleep=False)
+ _poll.assert_called_with(0.25, sleep=True)
# tasks timeout wins
tasks.return_value = 0
cli.poll(250)
- _poll.assert_called_with(0, sleep=False)
+ _poll.assert_called_with(0, sleep=True)
# default is request_timeout_ms
metadata.return_value = 1000000
tasks.return_value = 10000
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0,
- sleep=False)
+ sleep=True)
def test__poll():