diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-08-06 17:49:38 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-08-13 20:00:59 -0700 |
commit | 77c1818a080b62704e8f406d5418345f73053409 (patch) | |
tree | 984f25de6693e0b5354c801a4a3590bfcb760577 /kafka/consumer | |
parent | 497ded919356038d57e935850346ff347b8ea6ef (diff) | |
download | kafka-python-no_sleep.tar.gz |
Drop unused sleep kwarg to pollno_sleep
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 3 | ||||
-rw-r--r-- | kafka/consumer/group.py | 6 |
2 files changed, 5 insertions, 4 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c0d6075..10ed187 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -275,8 +275,7 @@ class Fetcher(six.Iterator): if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll( - future=refresh_future, sleep=True, timeout_ms=remaining_ms) + self._client.poll(future=refresh_future, timeout_ms=remaining_ms) else: time.sleep(self.config['retry_backoff_ms'] / 1000.0) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 54a3711..2de254d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -613,7 +613,7 @@ class KafkaConsumer(six.Iterator): # Send any new fetches (won't resend pending fetches) self._fetcher.send_fetches() - self._client.poll(timeout_ms=timeout_ms, sleep=True) + self._client.poll(timeout_ms=timeout_ms) records, _ = self._fetcher.fetched_records(max_records) return records @@ -1019,7 +1019,7 @@ class KafkaConsumer(six.Iterator): poll_ms = 1000 * (self._consumer_timeout - time.time()) if not self._fetcher.in_flight_fetches(): poll_ms = 0 - self._client.poll(timeout_ms=poll_ms, sleep=True) + self._client.poll(timeout_ms=poll_ms) # We need to make sure we at least keep up with scheduled tasks, # like heartbeats, auto-commits, and metadata refreshes @@ -1045,6 +1045,8 @@ class KafkaConsumer(six.Iterator): if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + if self._client.in_flight_request_count(): + self._client.poll(timeout_ms=0) # An else block on a for loop only executes if there was no break # so this should only be called on a StopIteration from the fetcher |