summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-12 13:42:24 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-12 13:42:24 +0530
commitb0c87eed088936c9ec571f789d7818bf445d47b3 (patch)
tree19fc42b1aa94305c8fb257c8fb694d4778dec03f /kafka/consumer.py
parenta4601d3a1bf6792e0d57e600f48e891ef2be1528 (diff)
downloadkafka-python-b0c87eed088936c9ec571f789d7818bf445d47b3.tar.gz
Optimize auto-commit thread
The previous commit optimized the commit thread such that the timer started only when there were messages to be consumed. This commit goes a step further and ensures the following: * Only one timer thread is created * The main app does not block on exit (waiting for timer thread to finish) This is ensured by having a single thread blocking on an event and keeps calling a function. We use events instead of time.sleep() so as to prevent the python interpreter from running every 50ms checking if the timer has expired (logic copied from threading.Timer)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py31
1 files changed, 10 insertions, 21 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index fe7881a..f8855dc 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -58,7 +58,8 @@ class SimpleConsumer(object):
# Set up the auto-commit timer
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
- self._timed_commit)
+ self.commit)
+ self.commit_timer.start()
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
@@ -149,15 +150,6 @@ class SimpleConsumer(object):
return total
- def _timed_commit(self):
- """
- Commit offsets as part of timer
- """
- self.commit()
-
- # Once the commit is done, start the timer again
- # self.commit_timer.start()
-
def commit(self, partitions=[]):
"""
Commit offsets for this consumer
@@ -166,11 +158,17 @@ class SimpleConsumer(object):
all of them
"""
- # short circuit if nothing happened
+ # short circuit if nothing happened. This check is kept outside
+ # to prevent un-necessarily acquiring a lock for checking the state
if self.count_since_commit == 0:
return
with self.commit_lock:
+ # Do this check again, just in case the state has changed
+ # during the lock acquiring timeout
+ if self.count_since_commit == 0:
+ return
+
reqs = []
if len(partitions) == 0: # commit all partitions
partitions = self.offsets.keys()
@@ -200,12 +198,7 @@ class SimpleConsumer(object):
return
if self.count_since_commit > self.auto_commit_every_n:
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
- self.commit_timer.start()
- else:
- self.commit()
+ self.commit()
def __iter__(self):
"""
@@ -223,10 +216,6 @@ class SimpleConsumer(object):
if len(iters) == 0:
break
- # Now that we are consuming data, start the commit thread
- if self.commit_timer and not self.commit_timer.is_active:
- self.commit_timer.start()
-
for partition, it in iters.items():
try:
yield it.next()