summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-29 17:12:06 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-29 17:12:06 -0800
commit995f11f9ec9840857acd2c2068df5c70664c1e88 (patch)
tree37d98c55bead31828e20df9723e9343e603b6824
parent421977a3421d505a22d3c26aff5fbbd1b209bbba (diff)
downloadkafka-python-iterator_timeout.tar.gz
Fix internal timeout / sleep handling in consumer iteratoriterator_timeout
-rw-r--r--kafka/consumer/group.py31
1 files changed, 16 insertions, 15 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 0e03544..f2991b2 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -635,26 +635,22 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
+ poll_ms = 1000 * (self._consumer_timeout - time.time())
+ if not self._fetcher.in_flight_fetches():
+ poll_ms = 0
+ self._client.poll(poll_ms)
+
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
- timeout_at = min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
+ timeout_at = self._next_timeout()
if self.config['api_version'] >= (0, 9):
if self.config['group_id'] is not None and not self.assignment():
- sleep_time = time.time() - timeout_at
- log.debug('No partitions assigned; sleeping for %s', sleep_time)
- time.sleep(sleep_time)
- continue
-
- poll_ms = 1000 * (time.time() - self._consumer_timeout)
-
- # Dont bother blocking if there are no fetches
- if not self._fetcher.in_flight_fetches():
- poll_ms = 0
-
- self._client.poll(poll_ms)
+ sleep_time = max(timeout_at - time.time(), 0)
+ if sleep_time > 0 and not self._client.in_flight_request_count():
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
if time.time() > timeout_at:
continue
@@ -672,6 +668,11 @@ class KafkaConsumer(six.Iterator):
else:
self._fetcher.init_fetches()
+ def _next_timeout(self):
+ return min(self._consumer_timeout,
+ self._client._delayed_tasks.next_at() + time.time(),
+ self._client.cluster.ttl() / 1000.0 + time.time())
+
def __iter__(self): # pylint: disable=non-iterator-returned
return self