diff options
author | Mike Lang <ekimekim@users.noreply.github.com> | 2018-08-31 06:11:23 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-08-31 06:11:23 -0700 |
commit | 5a04bc78f3392038733d65fc1e4830c8b14cd6fd (patch) | |
tree | c1d0edfeaf9c5b29526edadf7ccb0d160e0fd047 | |
parent | 36b53f487778e919dfe6a5940dc25c552444cc7c (diff) | |
download | kafka-python-5a04bc78f3392038733d65fc1e4830c8b14cd6fd.tar.gz |
Return future from commit_offsets_async (#1560)
-rw-r--r-- | kafka/coordinator/consumer.py | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index f90d182..647a6b5 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -441,10 +441,13 @@ class ConsumerCoordinator(BaseCoordinator): response will be either an Exception or a OffsetCommitResponse struct. This callback can be used to trigger custom actions when a commit request completes. + + Returns: + kafka.future.Future """ self._invoke_completed_offset_commit_callbacks() if not self.coordinator_unknown(): - self._do_commit_offsets_async(offsets, callback) + future = self._do_commit_offsets_async(offsets, callback) else: # we don't know the current coordinator, so try to find it and then # send the commit or fail (we don't want recursive retries which can @@ -464,6 +467,8 @@ class ConsumerCoordinator(BaseCoordinator): # through delayed task execution. self._client.poll(timeout_ms=0) # no wakeup if we add that feature + return future + def _do_commit_offsets_async(self, offsets, callback=None): assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) |