diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-29 17:12:06 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-29 17:12:06 -0800 |
commit | 995f11f9ec9840857acd2c2068df5c70664c1e88 (patch) | |
tree | 37d98c55bead31828e20df9723e9343e603b6824 | |
parent | 421977a3421d505a22d3c26aff5fbbd1b209bbba (diff) | |
download | kafka-python-iterator_timeout.tar.gz |
Fix internal timeout / sleep handling in consumer iteratoriterator_timeout
-rw-r--r-- | kafka/consumer/group.py | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0e03544..f2991b2 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -635,26 +635,22 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) + poll_ms = 1000 * (self._consumer_timeout - time.time()) + if not self._fetcher.in_flight_fetches(): + poll_ms = 0 + self._client.poll(poll_ms) + # We need to make sure we at least keep up with scheduled tasks, # like heartbeats, auto-commits, and metadata refreshes - timeout_at = min(self._consumer_timeout, - self._client._delayed_tasks.next_at() + time.time(), - self._client.cluster.ttl() / 1000.0 + time.time()) + timeout_at = self._next_timeout() if self.config['api_version'] >= (0, 9): if self.config['group_id'] is not None and not self.assignment(): - sleep_time = time.time() - timeout_at - log.debug('No partitions assigned; sleeping for %s', sleep_time) - time.sleep(sleep_time) - continue - - poll_ms = 1000 * (time.time() - self._consumer_timeout) - - # Dont bother blocking if there are no fetches - if not self._fetcher.in_flight_fetches(): - poll_ms = 0 - - self._client.poll(poll_ms) + sleep_time = max(timeout_at - time.time(), 0) + if sleep_time > 0 and not self._client.in_flight_request_count(): + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue if time.time() > timeout_at: continue @@ -672,6 +668,11 @@ class KafkaConsumer(six.Iterator): else: self._fetcher.init_fetches() + def _next_timeout(self): + return min(self._consumer_timeout, + self._client._delayed_tasks.next_at() + time.time(), + self._client.cluster.ttl() / 1000.0 + time.time()) + def __iter__(self): # pylint: disable=non-iterator-returned return self |