summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-14 23:26:17 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit80dfaeb9a7b86a8de541556fce04438aff144bcd (patch)
treee5d80f29b3fe54bd07760fc222acb9c4611eb5e4 /kafka
parente702880bda02f5f8c142afe34ce7924a08516389 (diff)
downloadkafka-python-80dfaeb9a7b86a8de541556fce04438aff144bcd.tar.gz
Move kafka._msg_iter initialization from __init__() to next()
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/new.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index e7c9c55..5946c93 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -122,8 +122,8 @@ class KafkaConsumer(object):
def __init__(self, *topics, **configs):
self.configure(**configs)
-
self.set_topic_partitions(*topics)
+ self._msg_iter = None
# Setup offsets
self._offsets = OffsetsStruct(fetch=defaultdict(dict),
@@ -143,9 +143,6 @@ class KafkaConsumer(object):
self._reset_highwater_offsets()
self._reset_task_done_offsets()
- # Start the message fetch generator
- self._msg_iter = self.fetch_messages()
-
def _fetch_stored_offsets(self):
logger.info("Consumer fetching stored offsets")
for topic, partition in self._topics:
@@ -198,6 +195,10 @@ class KafkaConsumer(object):
self._set_consumer_timeout_start()
while True:
+ # Fetch a new batch if needed
+ if self._msg_iter is None:
+ self._msg_iter = self.fetch_messages()
+
# Check for auto-commit
if self.should_auto_commit():
self.commit()
@@ -205,9 +206,9 @@ class KafkaConsumer(object):
try:
return self._msg_iter.next()
- # If the previous batch finishes, start get new batch
+ # Handle batch completion
except StopIteration:
- self._msg_iter = self.fetch_messages()
+ self._msg_iter = None
self._check_consumer_timeout()