summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py12
1 files changed, 3 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ecd2cea..4e4e835 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -495,7 +495,7 @@ class KafkaClient(object):
return self._conns[node_id].send(request)
- def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
+ def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
"""Try to read and write to sockets.
This method will also attempt to complete node connections, refresh
@@ -507,9 +507,6 @@ class KafkaClient(object):
timeout will be the minimum of timeout, request timeout and
metadata timeout. Default: request_timeout_ms
future (Future, optional): if provided, blocks until future.is_done
- sleep (bool): if True and there is nothing to do (no connections
- or requests in flight), will sleep for duration timeout before
- returning empty results. Default: False.
Returns:
list: responses received (can be empty)
@@ -553,7 +550,7 @@ class KafkaClient(object):
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
- responses.extend(self._poll(timeout, sleep=sleep))
+ responses.extend(self._poll(timeout))
# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
@@ -562,10 +559,7 @@ class KafkaClient(object):
return responses
- def _poll(self, timeout, sleep=True):
- # select on reads across all connected sockets, blocking up to timeout
- assert self.in_flight_request_count() > 0 or self._connecting or sleep
-
+ def _poll(self, timeout):
responses = []
processed = set()