summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-12 14:46:02 -0800
committerDana Powers <dana.powers@rd.io>2016-01-12 15:39:48 -0800
commit3e622068ea7a970c8674a518a05355b6065560f1 (patch)
tree3b1cd9fc622c2cd424d55a2a3bb704d909a380e8
parent22e84a57cb0a33aef3b37ed0515a85244d3a1615 (diff)
downloadkafka-python-3e622068ea7a970c8674a518a05355b6065560f1.tar.gz
Sleep in KafkaConsumer iterator if no partition assignment; dont block in poll if no in-flight fetchesiterator_fetches
-rw-r--r--kafka/consumer/group.py29
1 files changed, 25 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 141c1fa..333ef64 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -611,6 +611,7 @@ class KafkaConsumer(six.Iterator):
self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
+ assert self.assignment() or self.subscription() is not None
while time.time() < self._consumer_timeout:
if self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
@@ -626,19 +627,40 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
- self._client.poll(
- max(0, self._consumer_timeout - time.time()) * 1000)
-
+ # We need to make sure we at least keep up with scheduled tasks,
+ # like heartbeats, auto-commits, and metadata refreshes
timeout_at = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
+
+ if self.config['api_version'] >= (0, 9):
+ if not self.assignment():
+ sleep_time = time.time() - timeout_at
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
+
+ poll_ms = 1000 * (time.time() - self._consumer_timeout)
+
+ # Dont bother blocking if there are no fetches
+ if not self._fetcher.in_flight_fetches():
+ poll_ms = 0
+
+ self._client.poll(poll_ms)
+
if time.time() > timeout_at:
continue
+
for msg in self._fetcher:
yield msg
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
+
+ # an else block on a for loop only executes if there was no break
+ # so this should only be called on a StopIteration from the fetcher
+ # and we assume that it is safe to init_fetches when fetcher is done
+ # i.e., there are no more records stored internally
else:
self._fetcher.init_fetches()
@@ -648,7 +670,6 @@ class KafkaConsumer(six.Iterator):
def __next__(self):
if not self._iterator:
self._iterator = self._message_generator()
- self._fetcher.init_fetches()
self._set_consumer_timeout()
try: