diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-12 13:42:24 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-12 13:42:24 +0530 |
commit | b0c87eed088936c9ec571f789d7818bf445d47b3 (patch) | |
tree | 19fc42b1aa94305c8fb257c8fb694d4778dec03f /kafka/consumer.py | |
parent | a4601d3a1bf6792e0d57e600f48e891ef2be1528 (diff) | |
download | kafka-python-b0c87eed088936c9ec571f789d7818bf445d47b3.tar.gz |
Optimize auto-commit thread
The previous commit optimized the commit thread such that the timer
started only when there were messages to be consumed. This commit
goes a step further and ensures the following:
* Only one timer thread is created
* The main app does not block on exit (waiting for timer thread to finish)
This is ensured by having a single thread blocking on an event and
keeps calling a function. We use events instead of time.sleep() so
as to prevent the python interpreter from running every 50ms checking
if the timer has expired (logic copied from threading.Timer)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 31 |
1 files changed, 10 insertions, 21 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index fe7881a..f8855dc 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -58,7 +58,8 @@ class SimpleConsumer(object): # Set up the auto-commit timer if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, - self._timed_commit) + self.commit) + self.commit_timer.start() def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: @@ -149,15 +150,6 @@ class SimpleConsumer(object): return total - def _timed_commit(self): - """ - Commit offsets as part of timer - """ - self.commit() - - # Once the commit is done, start the timer again - # self.commit_timer.start() - def commit(self, partitions=[]): """ Commit offsets for this consumer @@ -166,11 +158,17 @@ class SimpleConsumer(object): all of them """ - # short circuit if nothing happened + # short circuit if nothing happened. This check is kept outside + # to prevent un-necessarily acquiring a lock for checking the state if self.count_since_commit == 0: return with self.commit_lock: + # Do this check again, just in case the state has changed + # during the lock acquiring timeout + if self.count_since_commit == 0: + return + reqs = [] if len(partitions) == 0: # commit all partitions partitions = self.offsets.keys() @@ -200,12 +198,7 @@ class SimpleConsumer(object): return if self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() + self.commit() def __iter__(self): """ @@ -223,10 +216,6 @@ class SimpleConsumer(object): if len(iters) == 0: break - # Now that we are consuming data, start the commit thread - if self.commit_timer and not self.commit_timer.is_active: - self.commit_timer.start() - for partition, it in iters.items(): try: yield it.next() |