summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index d09803a..4097fe3 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -140,12 +140,12 @@ class SimpleConsumer(object):
"""
Commit offsets as part of timer
"""
- self.commit()
+ self.commit(include_current_offset=False)
# Once the commit is done, start the timer again
self.commit_timer.start()
- def commit(self, partitions=[]):
+ def commit(self, partitions=[], include_current_offset=True):
"""
Commit offsets for this consumer
@@ -163,7 +163,10 @@ class SimpleConsumer(object):
partitions = self.offsets.keys()
for partition in partitions:
- offset = self.offsets[partition]
+ if include_current_offset is True:
+ offset = self.offsets[partition] + 1
+ else:
+ offset = self.offsets[partition]
log.debug("Commit offset %d in SimpleConsumer: "
"group=%s, topic=%s, partition=%s" %
(offset, self.group, self.topic, partition))
@@ -227,9 +230,8 @@ class SimpleConsumer(object):
next_offset = None
for message in resp.messages:
next_offset = message.offset
- yield message
- # update the internal state _after_ we yield the message
self.offsets[partition] = message.offset
+ yield message
if next_offset is None:
break
else: