summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/consumer.py7
1 files changed, 6 insertions, 1 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index f90d182..647a6b5 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -441,10 +441,13 @@ class ConsumerCoordinator(BaseCoordinator):
response will be either an Exception or a OffsetCommitResponse
struct. This callback can be used to trigger custom actions when
a commit request completes.
+
+ Returns:
+ kafka.future.Future
"""
self._invoke_completed_offset_commit_callbacks()
if not self.coordinator_unknown():
- self._do_commit_offsets_async(offsets, callback)
+ future = self._do_commit_offsets_async(offsets, callback)
else:
# we don't know the current coordinator, so try to find it and then
# send the commit or fail (we don't want recursive retries which can
@@ -464,6 +467,8 @@ class ConsumerCoordinator(BaseCoordinator):
# through delayed task execution.
self._client.poll(timeout_ms=0) # no wakeup if we add that feature
+ return future
+
def _do_commit_offsets_async(self, offsets, callback=None):
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))