summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py41
1 files changed, 29 insertions, 12 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 93da316..7f6a6f0 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -7,8 +7,15 @@ from kafka.common import (
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest
)
+from kafka.util import (
+ ReentrantTimer
+)
+
log = logging.getLogger("kafka")
+AUTO_COMMIT_MSG_COUNT = 100
+AUTO_COMMIT_INTERVAL = 5000
+
class SimpleConsumer(object):
"""
A simple consumer implementation that consumes all partitions for a topic
@@ -27,7 +34,9 @@ class SimpleConsumer(object):
manual call to commit will also reset these triggers
"""
- def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None):
+ def __init__(self, client, group, topic, auto_commit=True,
+ auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL):
self.client = client
self.topic = topic
self.group = group
@@ -151,19 +160,21 @@ class SimpleConsumer(object):
with self.commit_lock:
reqs = []
if len(partitions) == 0: # commit all partitions
- for partition, offset in self.offsets.items():
- log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
- offset, self.group, self.topic, partition))
- reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
- else:
- for partition in partitions:
- offset = self.offsets[partition]
- log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % (
- offset, self.group, self.topic, partition))
- reqs.append(OffsetCommitRequest(self.topic, partition, offset, None))
- resps = self.send_offset_commit_request(self.group, reqs)
+ partitions = self.offsets.keys()
+
+ for partition in partitions:
+ offset = self.offsets[partition]
+ log.debug("Commit offset %d in SimpleConsumer: "
+ "group=%s, topic=%s, partition=%s" %
+ (offset, self.group, self.topic, partition))
+
+ reqs.append(OffsetCommitRequest(self.topic, partition,
+ offset, None))
+
+ resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
assert resp.error == 0
+
self.count_since_commit = 0
def __iter__(self):
@@ -207,6 +218,12 @@ class SimpleConsumer(object):
a batch of messages, yield them one at a time. After a batch is exhausted,
start a new batch unless we've reached the end of ths partition.
"""
+
+ # Unless it is the first message in the queue, we have to fetch
+ # the next one
+ if offset != 0:
+ offset += 1
+
while True:
req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size
(resp,) = self.client.send_fetch_request([req])