diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-21 19:34:48 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:43:45 -0800 |
commit | 08df93b3f971a3b75db2270e4d31530dbb60b5b0 (patch) | |
tree | 8e9ae6df4b3baf2e71abf4fd691c969681417fba | |
parent | 1a06f79cb8ca0d68f06e517e2aad7f0b30c0278e (diff) | |
download | kafka-python-08df93b3f971a3b75db2270e4d31530dbb60b5b0.tar.gz |
Add private methods _does_auto_commit_ms and _does_auto_commit_messages
-rw-r--r-- | kafka/consumer/kafka.py | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 705c70d..af59e7e 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -494,7 +494,7 @@ class KafkaConsumer(object): self._offsets.task_done[topic_partition] = offset # Check for auto-commit - if self._config['auto_commit_enable']: + if self._does_auto_commit_messages(): self._incr_auto_commit_message_count() if self._should_auto_commit(): @@ -672,14 +672,11 @@ class KafkaConsumer(object): # def _should_auto_commit(self): - if not self._config['auto_commit_enable']: - return False - - if self._config['auto_commit_interval_ms'] > 0: + if self._does_auto_commit_ms(): if time.time() >= self._next_commit_time: return True - if self._config['auto_commit_interval_messages'] > 0: + if self._does_auto_commit_messages(): if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: return True @@ -688,12 +685,30 @@ class KafkaConsumer(object): def _reset_auto_commit(self): self._uncommitted_message_count = 0 self._next_commit_time = None - if self._config['auto_commit_interval_ms'] > 0: + if self._does_auto_commit_ms(): self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) def _incr_auto_commit_message_count(self, n=1): self._uncommitted_message_count += n + def _does_auto_commit_ms(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_ms'] + if conf is not None and conf > 0: + return True + return False + + def _does_auto_commit_messages(self): + if not self._config['auto_commit_enable']: + return False + + conf = self._config['auto_commit_interval_messages'] + if conf is not None and conf > 0: + return True + return False + # # Message iterator private methods # |