summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-21 19:34:48 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:43:45 -0800
commit08df93b3f971a3b75db2270e4d31530dbb60b5b0 (patch)
tree8e9ae6df4b3baf2e71abf4fd691c969681417fba
parent1a06f79cb8ca0d68f06e517e2aad7f0b30c0278e (diff)
downloadkafka-python-08df93b3f971a3b75db2270e4d31530dbb60b5b0.tar.gz
Add private methods _does_auto_commit_ms and _does_auto_commit_messages
-rw-r--r--kafka/consumer/kafka.py29
1 files changed, 22 insertions, 7 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 705c70d..af59e7e 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -494,7 +494,7 @@ class KafkaConsumer(object):
self._offsets.task_done[topic_partition] = offset
# Check for auto-commit
- if self._config['auto_commit_enable']:
+ if self._does_auto_commit_messages():
self._incr_auto_commit_message_count()
if self._should_auto_commit():
@@ -672,14 +672,11 @@ class KafkaConsumer(object):
#
def _should_auto_commit(self):
- if not self._config['auto_commit_enable']:
- return False
-
- if self._config['auto_commit_interval_ms'] > 0:
+ if self._does_auto_commit_ms():
if time.time() >= self._next_commit_time:
return True
- if self._config['auto_commit_interval_messages'] > 0:
+ if self._does_auto_commit_messages():
if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
return True
@@ -688,12 +685,30 @@ class KafkaConsumer(object):
def _reset_auto_commit(self):
self._uncommitted_message_count = 0
self._next_commit_time = None
- if self._config['auto_commit_interval_ms'] > 0:
+ if self._does_auto_commit_ms():
self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
def _incr_auto_commit_message_count(self, n=1):
self._uncommitted_message_count += n
+ def _does_auto_commit_ms(self):
+ if not self._config['auto_commit_enable']:
+ return False
+
+ conf = self._config['auto_commit_interval_ms']
+ if conf is not None and conf > 0:
+ return True
+ return False
+
+ def _does_auto_commit_messages(self):
+ if not self._config['auto_commit_enable']:
+ return False
+
+ conf = self._config['auto_commit_interval_messages']
+ if conf is not None and conf > 0:
+ return True
+ return False
+
#
# Message iterator private methods
#