diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 41 |
1 files changed, 29 insertions, 12 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 93da316..7f6a6f0 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -7,8 +7,15 @@ from kafka.common import ( OffsetRequest, OffsetFetchRequest, OffsetCommitRequest ) +from kafka.util import ( + ReentrantTimer +) + log = logging.getLogger("kafka") +AUTO_COMMIT_MSG_COUNT = 100 +AUTO_COMMIT_INTERVAL = 5000 + class SimpleConsumer(object): """ A simple consumer implementation that consumes all partitions for a topic @@ -27,7 +34,9 @@ class SimpleConsumer(object): manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None): + def __init__(self, client, group, topic, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client self.topic = topic self.group = group @@ -151,19 +160,21 @@ class SimpleConsumer(object): with self.commit_lock: reqs = [] if len(partitions) == 0: # commit all partitions - for partition, offset in self.offsets.items(): - log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( - offset, self.group, self.topic, partition)) - reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - else: - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( - offset, self.group, self.topic, partition)) - reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.send_offset_commit_request(self.group, reqs) + partitions = self.offsets.keys() + + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + resps = self.client.send_offset_commit_request(self.group, reqs) for resp in resps: assert resp.error == 0 + self.count_since_commit = 0 def __iter__(self): @@ -207,6 +218,12 @@ class SimpleConsumer(object): a batch of messages, yield them one at a time. After a batch is exhausted, start a new batch unless we've reached the end of ths partition. """ + + # Unless it is the first message in the queue, we have to fetch + # the next one + if offset != 0: + offset += 1 + while True: req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size (resp,) = self.client.send_fetch_request([req]) |