summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-03-06 10:30:10 -0800
committerDana Powers <dana.powers@gmail.com>2017-03-13 10:50:24 -0700
commit7afdb5b8ba7a6f3b2802b9a3e13c10a8fec5aba5 (patch)
treef432eba5c94ae25bed3403b9aab8afc7ce403259
parent899f11730db5f209c03cfad20111ec131ee4c70b (diff)
downloadkafka-python-skip_autocommit_consumer_close.tar.gz
Optionally skip auto-commit during consumer.closeskip_autocommit_consumer_close
-rw-r--r--kafka/consumer/group.py4
-rw-r--r--kafka/coordinator/consumer.py5
2 files changed, 5 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index f2b1699..0bfc5e0 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -376,13 +376,13 @@ class KafkaConsumer(six.Iterator):
"""
return self._subscription.assigned_partitions()
- def close(self):
+ def close(self, autocommit=True):
"""Close the consumer, waiting indefinitely for any needed cleanup."""
if self._closed:
return
log.debug("Closing the KafkaConsumer.")
self._closed = True
- self._coordinator.close()
+ self._coordinator.close(autocommit=autocommit)
self._metrics.close()
self._client.close()
try:
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index fac8144..fdbb995 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -325,9 +325,10 @@ class ConsumerCoordinator(BaseCoordinator):
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
- def close(self):
+ def close(self, autocommit=True):
try:
- self._maybe_auto_commit_offsets_sync()
+ if autocommit:
+ self._maybe_auto_commit_offsets_sync()
finally:
super(ConsumerCoordinator, self).close()