diff options
-rw-r--r-- | kafka/consumer.py | 12 | ||||
-rw-r--r-- | test/unit2.py | 56 |
2 files changed, 63 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index d09803a..4097fe3 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -140,12 +140,12 @@ class SimpleConsumer(object): """ Commit offsets as part of timer """ - self.commit() + self.commit(include_current_offset=False) # Once the commit is done, start the timer again self.commit_timer.start() - def commit(self, partitions=[]): + def commit(self, partitions=[], include_current_offset=True): """ Commit offsets for this consumer @@ -163,7 +163,10 @@ class SimpleConsumer(object): partitions = self.offsets.keys() for partition in partitions: - offset = self.offsets[partition] + if include_current_offset is True: + offset = self.offsets[partition] + 1 + else: + offset = self.offsets[partition] log.debug("Commit offset %d in SimpleConsumer: " "group=%s, topic=%s, partition=%s" % (offset, self.group, self.topic, partition)) @@ -227,9 +230,8 @@ class SimpleConsumer(object): next_offset = None for message in resp.messages: next_offset = message.offset - yield message - # update the internal state _after_ we yield the message self.offsets[partition] = message.offset + yield message if next_offset is None: break else: diff --git a/test/unit2.py b/test/unit2.py new file mode 100644 index 0000000..562bb76 --- /dev/null +++ b/test/unit2.py @@ -0,0 +1,56 @@ +import os +import random +import struct +import unittest + +from kafka.consumer import SimpleConsumer +from kafka.client import KafkaClient +from kafka.common import ( + ErrorMapping, OffsetAndMessage, OffsetCommitResponse, Message, FetchResponse +) + +class MockClient(object): + def __init__(self): + self.topic_partitions = {"topic": [0]} + self.mock_committed_offsets = {} + + def send_fetch_request(self, reqs): + resps = [] + for req in reqs: + msgs = [OffsetAndMessage(0, Message(0, 0, "key", "value1")), + OffsetAndMessage(1, Message(0, 0, "key", "value2")), + OffsetAndMessage(2, Message(0, 0, "key", "value3")), + OffsetAndMessage(3, Message(0, 0, "key", "value4")), + OffsetAndMessage(4, Message(0, 0, "key", "value5"))] + resp = FetchResponse(req.topic, req.partition, ErrorMapping.NO_ERROR, 0, msgs) + resps.append(resp) + return resps + + def send_offset_commit_request(self, group, reqs): + resps = [] + for req in reqs: + self.mock_committed_offsets[(req.topic, req.partition)] = req.offset + resp = OffsetCommitResponse(req.topic, req.partition, ErrorMapping.NO_ERROR) + resps.append(resp) + return resps + + def _load_metadata_for_topics(self, topic): + pass + +class TestConsumer(unittest.TestCase): + def test_offsets(self): + client = MockClient() + consumer = SimpleConsumer(client, "group", "topic", auto_commit=False) + it = iter(consumer) + m = it.next() + self.assertEquals(m.offset, 0) + self.assertEquals(consumer.offsets[0], 0) + m = it.next() + self.assertEquals(m.offset, 1) + self.assertEquals(consumer.offsets[0], 1) + consumer.commit() + + print(client.mock_committed_offsets) + +if __name__ == '__main__': + unittest.main() |