summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-08-06 17:49:38 -0700
committerDana Powers <dana.powers@gmail.com>2017-08-13 20:00:59 -0700
commit77c1818a080b62704e8f406d5418345f73053409 (patch)
tree984f25de6693e0b5354c801a4a3590bfcb760577
parent497ded919356038d57e935850346ff347b8ea6ef (diff)
downloadkafka-python-no_sleep.tar.gz
Drop unused sleep kwarg to pollno_sleep
-rw-r--r--kafka/client_async.py12
-rw-r--r--kafka/consumer/fetcher.py3
-rw-r--r--kafka/consumer/group.py6
-rw-r--r--kafka/producer/sender.py2
-rw-r--r--test/test_client_async.py33
5 files changed, 25 insertions, 31 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ecd2cea..4e4e835 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -495,7 +495,7 @@ class KafkaClient(object):
return self._conns[node_id].send(request)
- def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
+ def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
"""Try to read and write to sockets.
This method will also attempt to complete node connections, refresh
@@ -507,9 +507,6 @@ class KafkaClient(object):
timeout will be the minimum of timeout, request timeout and
metadata timeout. Default: request_timeout_ms
future (Future, optional): if provided, blocks until future.is_done
- sleep (bool): if True and there is nothing to do (no connections
- or requests in flight), will sleep for duration timeout before
- returning empty results. Default: False.
Returns:
list: responses received (can be empty)
@@ -553,7 +550,7 @@ class KafkaClient(object):
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
- responses.extend(self._poll(timeout, sleep=sleep))
+ responses.extend(self._poll(timeout))
# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
@@ -562,10 +559,7 @@ class KafkaClient(object):
return responses
- def _poll(self, timeout, sleep=True):
- # select on reads across all connected sockets, blocking up to timeout
- assert self.in_flight_request_count() > 0 or self._connecting or sleep
-
+ def _poll(self, timeout):
responses = []
processed = set()
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c0d6075..10ed187 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -275,8 +275,7 @@ class Fetcher(six.Iterator):
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
- self._client.poll(
- future=refresh_future, sleep=True, timeout_ms=remaining_ms)
+ self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
else:
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 54a3711..2de254d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -613,7 +613,7 @@ class KafkaConsumer(six.Iterator):
# Send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()
- self._client.poll(timeout_ms=timeout_ms, sleep=True)
+ self._client.poll(timeout_ms=timeout_ms)
records, _ = self._fetcher.fetched_records(max_records)
return records
@@ -1019,7 +1019,7 @@ class KafkaConsumer(six.Iterator):
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
- self._client.poll(timeout_ms=poll_ms, sleep=True)
+ self._client.poll(timeout_ms=poll_ms)
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
@@ -1045,6 +1045,8 @@ class KafkaConsumer(six.Iterator):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
+ if self._client.in_flight_request_count():
+ self._client.poll(timeout_ms=0)
# An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 2974faf..ad59050 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -156,7 +156,7 @@ class Sender(threading.Thread):
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
- self._client.poll(poll_timeout_ms, sleep=True)
+ self._client.poll(poll_timeout_ms)
def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
diff --git a/test/test_client_async.py b/test/test_client_async.py
index d4e6d37..ec45543 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -259,23 +259,22 @@ def test_poll(mocker):
metadata.return_value = 1000
tasks.return_value = 2
cli.poll()
- _poll.assert_called_with(1.0, sleep=True)
+ _poll.assert_called_with(1.0)
# user timeout wins
cli.poll(250)
- _poll.assert_called_with(0.25, sleep=True)
+ _poll.assert_called_with(0.25)
# tasks timeout wins
tasks.return_value = 0
cli.poll(250)
- _poll.assert_called_with(0, sleep=True)
+ _poll.assert_called_with(0)
# default is request_timeout_ms
metadata.return_value = 1000000
tasks.return_value = 10000
cli.poll()
- _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0,
- sleep=True)
+ _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
def test__poll():
@@ -337,8 +336,8 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234
- client.poll(timeout_ms=12345678, sleep=True)
- client._poll.assert_called_with(1.234, sleep=True)
+ client.poll(timeout_ms=12345678)
+ client._poll.assert_called_with(1.234)
def test_maybe_refresh_metadata_backoff(mocker, client):
@@ -346,15 +345,15 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
t = mocker.patch('time.time')
t.return_value = now
- client.poll(timeout_ms=12345678, sleep=True)
- client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
+ client.poll(timeout_ms=12345678)
+ client._poll.assert_called_with(2.222) # reconnect backoff
def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True
- client.poll(timeout_ms=12345678, sleep=True)
- client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
+ client.poll(timeout_ms=12345678)
+ client._poll.assert_called_with(9999.999) # request_timeout_ms
def test_maybe_refresh_metadata_update(mocker, client):
@@ -362,8 +361,8 @@ def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, '_can_send_request', return_value=True)
send = mocker.patch.object(client, 'send')
- client.poll(timeout_ms=12345678, sleep=True)
- client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
+ client.poll(timeout_ms=12345678)
+ client._poll.assert_called_with(9999.999) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
send.assert_called_once_with('foobar', request)
@@ -379,16 +378,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
t.return_value = now
# first poll attempts connection
- client.poll(timeout_ms=12345678, sleep=True)
- client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
+ client.poll(timeout_ms=12345678)
+ client._poll.assert_called_with(2.222) # reconnect backoff
client._can_connect.assert_called_once_with('foobar')
client._maybe_connect.assert_called_once_with('foobar')
# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
client._can_connect.reset_mock()
- client.poll(timeout_ms=12345678, sleep=True)
- client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
+ client.poll(timeout_ms=12345678)
+ client._poll.assert_called_with(9999.999) # connection timeout (request timeout)
assert not client._can_connect.called
assert not client._metadata_refresh_in_progress