diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index d09803a..4097fe3 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -140,12 +140,12 @@ class SimpleConsumer(object): """ Commit offsets as part of timer """ - self.commit() + self.commit(include_current_offset=False) # Once the commit is done, start the timer again self.commit_timer.start() - def commit(self, partitions=[]): + def commit(self, partitions=[], include_current_offset=True): """ Commit offsets for this consumer @@ -163,7 +163,10 @@ class SimpleConsumer(object): partitions = self.offsets.keys() for partition in partitions: - offset = self.offsets[partition] + if include_current_offset is True: + offset = self.offsets[partition] + 1 + else: + offset = self.offsets[partition] log.debug("Commit offset %d in SimpleConsumer: " "group=%s, topic=%s, partition=%s" % (offset, self.group, self.topic, partition)) @@ -227,9 +230,8 @@ class SimpleConsumer(object): next_offset = None for message in resp.messages: next_offset = message.offset - yield message - # update the internal state _after_ we yield the message self.offsets[partition] = message.offset + yield message if next_offset is None: break else: |