summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 54a3711..2de254d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -613,7 +613,7 @@ class KafkaConsumer(six.Iterator):
# Send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()
- self._client.poll(timeout_ms=timeout_ms, sleep=True)
+ self._client.poll(timeout_ms=timeout_ms)
records, _ = self._fetcher.fetched_records(max_records)
return records
@@ -1019,7 +1019,7 @@ class KafkaConsumer(six.Iterator):
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
- self._client.poll(timeout_ms=poll_ms, sleep=True)
+ self._client.poll(timeout_ms=poll_ms)
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
@@ -1045,6 +1045,8 @@ class KafkaConsumer(six.Iterator):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
+ if self._client.in_flight_request_count():
+ self._client.poll(timeout_ms=0)
# An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher