From 08df93b3f971a3b75db2270e4d31530dbb60b5b0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 21 Sep 2014 19:34:48 -0700 Subject: Add private methods _does_auto_commit_ms and _does_auto_commit_messages --- kafka/consumer/kafka.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 705c70d..af59e7e 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -494,7 +494,7 @@ class KafkaConsumer(object): self._offsets.task_done[topic_partition] = offset # Check for auto-commit - if self._config['auto_commit_enable']: + if self._does_auto_commit_messages(): self._incr_auto_commit_message_count() if self._should_auto_commit(): @@ -672,14 +672,11 @@ class KafkaConsumer(object): # def _should_auto_commit(self): - if not self._config['auto_commit_enable']: - return False - - if self._config['auto_commit_interval_ms'] > 0: + if self._does_auto_commit_ms(): if time.time() >= self._next_commit_time: return True - if self._config['auto_commit_interval_messages'] > 0: + if self._does_auto_commit_messages(): if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: return True @@ -688,12 +685,30 @@ class KafkaConsumer(object): def _reset_auto_commit(self): self._uncommitted_message_count = 0 self._next_commit_time = None - if self._config['auto_commit_interval_ms'] > 0: + if self._does_auto_commit_ms(): self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) def _incr_auto_commit_message_count(self, n=1): self._uncommitted_message_count += n + def _does_auto_commit_ms(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_ms'] + if conf is not None and conf > 0: + return True + return False + + def _does_auto_commit_messages(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_messages'] + if conf is not None and conf > 0: + return True + return False + # # Message iterator private methods # -- cgit v1.2.1