diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-14 23:26:17 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:55 -0800 |
commit | 80dfaeb9a7b86a8de541556fce04438aff144bcd (patch) | |
tree | e5d80f29b3fe54bd07760fc222acb9c4611eb5e4 /kafka | |
parent | e702880bda02f5f8c142afe34ce7924a08516389 (diff) | |
download | kafka-python-80dfaeb9a7b86a8de541556fce04438aff144bcd.tar.gz |
Move kafka._msg_iter initialization from __init__() to next()
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/new.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index e7c9c55..5946c93 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -122,8 +122,8 @@ class KafkaConsumer(object): def __init__(self, *topics, **configs): self.configure(**configs) - self.set_topic_partitions(*topics) + self._msg_iter = None # Setup offsets self._offsets = OffsetsStruct(fetch=defaultdict(dict), @@ -143,9 +143,6 @@ class KafkaConsumer(object): self._reset_highwater_offsets() self._reset_task_done_offsets() - # Start the message fetch generator - self._msg_iter = self.fetch_messages() - def _fetch_stored_offsets(self): logger.info("Consumer fetching stored offsets") for topic, partition in self._topics: @@ -198,6 +195,10 @@ class KafkaConsumer(object): self._set_consumer_timeout_start() while True: + # Fetch a new batch if needed + if self._msg_iter is None: + self._msg_iter = self.fetch_messages() + # Check for auto-commit if self.should_auto_commit(): self.commit() @@ -205,9 +206,9 @@ class KafkaConsumer(object): try: return self._msg_iter.next() - # If the previous batch finishes, start get new batch + # Handle batch completion except StopIteration: - self._msg_iter = self.fetch_messages() + self._msg_iter = None self._check_consumer_timeout() |