summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-06-01 00:08:16 -0400
committerDavid Arthur <mumrah@gmail.com>2013-06-01 00:10:47 -0400
commitacd388e4eab100b5cf23481780f0d836ca0c21fd (patch)
treedd21e726de91356689c6063057199418b7b16f2c
parent2c257eeb1f02748840a8f4535d8d2a88ef5235f2 (diff)
downloadkafka-python-issue-26.tar.gz
Starting work on fixing offset commitsissue-26
* Update the "public" offset before yielding the message * Add an option to SimpleConsumer.commit that excludes the current offset Ref #26
-rw-r--r--kafka/consumer.py12
-rw-r--r--test/unit2.py56
2 files changed, 63 insertions, 5 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index d09803a..4097fe3 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -140,12 +140,12 @@ class SimpleConsumer(object):
"""
Commit offsets as part of timer
"""
- self.commit()
+ self.commit(include_current_offset=False)
# Once the commit is done, start the timer again
self.commit_timer.start()
- def commit(self, partitions=[]):
+ def commit(self, partitions=[], include_current_offset=True):
"""
Commit offsets for this consumer
@@ -163,7 +163,10 @@ class SimpleConsumer(object):
partitions = self.offsets.keys()
for partition in partitions:
- offset = self.offsets[partition]
+ if include_current_offset is True:
+ offset = self.offsets[partition] + 1
+ else:
+ offset = self.offsets[partition]
log.debug("Commit offset %d in SimpleConsumer: "
"group=%s, topic=%s, partition=%s" %
(offset, self.group, self.topic, partition))
@@ -227,9 +230,8 @@ class SimpleConsumer(object):
next_offset = None
for message in resp.messages:
next_offset = message.offset
- yield message
- # update the internal state _after_ we yield the message
self.offsets[partition] = message.offset
+ yield message
if next_offset is None:
break
else:
diff --git a/test/unit2.py b/test/unit2.py
new file mode 100644
index 0000000..562bb76
--- /dev/null
+++ b/test/unit2.py
@@ -0,0 +1,56 @@
+import os
+import random
+import struct
+import unittest
+
+from kafka.consumer import SimpleConsumer
+from kafka.client import KafkaClient
+from kafka.common import (
+ ErrorMapping, OffsetAndMessage, OffsetCommitResponse, Message, FetchResponse
+)
+
+class MockClient(object):
+ def __init__(self):
+ self.topic_partitions = {"topic": [0]}
+ self.mock_committed_offsets = {}
+
+ def send_fetch_request(self, reqs):
+ resps = []
+ for req in reqs:
+ msgs = [OffsetAndMessage(0, Message(0, 0, "key", "value1")),
+ OffsetAndMessage(1, Message(0, 0, "key", "value2")),
+ OffsetAndMessage(2, Message(0, 0, "key", "value3")),
+ OffsetAndMessage(3, Message(0, 0, "key", "value4")),
+ OffsetAndMessage(4, Message(0, 0, "key", "value5"))]
+ resp = FetchResponse(req.topic, req.partition, ErrorMapping.NO_ERROR, 0, msgs)
+ resps.append(resp)
+ return resps
+
+ def send_offset_commit_request(self, group, reqs):
+ resps = []
+ for req in reqs:
+ self.mock_committed_offsets[(req.topic, req.partition)] = req.offset
+ resp = OffsetCommitResponse(req.topic, req.partition, ErrorMapping.NO_ERROR)
+ resps.append(resp)
+ return resps
+
+ def _load_metadata_for_topics(self, topic):
+ pass
+
+class TestConsumer(unittest.TestCase):
+ def test_offsets(self):
+ client = MockClient()
+ consumer = SimpleConsumer(client, "group", "topic", auto_commit=False)
+ it = iter(consumer)
+ m = it.next()
+ self.assertEquals(m.offset, 0)
+ self.assertEquals(consumer.offsets[0], 0)
+ m = it.next()
+ self.assertEquals(m.offset, 1)
+ self.assertEquals(consumer.offsets[0], 1)
+ consumer.commit()
+
+ print(client.mock_committed_offsets)
+
+if __name__ == '__main__':
+ unittest.main()