diff options
author | Brian Sang <sang.bri@gmail.com> | 2019-05-22 23:54:14 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-05-22 23:54:14 -0700 |
commit | 1f73287e890a4c68f240dcc8b6966de1e62b65cc (patch) | |
tree | 6e9f76a1412598475a60e0c3101066566542faf5 | |
parent | edfafc036f0d9a3b1e5c73f9642ef71c297c1b64 (diff) | |
download | kafka-python-1f73287e890a4c68f240dcc8b6966de1e62b65cc.tar.gz |
Make partitions_for_topic a read-through cache (#1781)
If the cluster metadata object has no info about the topic, then issue a blocking metadata call to fetch it.
-rw-r--r-- | kafka/consumer/group.py | 33 |
1 files changed, 25 insertions, 8 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6270407..39a4e08 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -552,11 +552,9 @@ class KafkaConsumer(six.Iterator): committed = None return committed - def topics(self): - """Get all topics the user is authorized to view. - - Returns: - set: topics + def _fetch_all_topit_metadata(self): + """A blocking call that fetches topic metadata for all topics in the + cluster that the user is authorized to view. """ cluster = self._client.cluster if self._client._metadata_refresh_in_progress and self._client._topics: @@ -567,10 +565,24 @@ class KafkaConsumer(six.Iterator): future = cluster.request_update() self._client.poll(future=future) cluster.need_all_topic_metadata = stash - return cluster.topics() + + def topics(self): + """Get all topics the user is authorized to view. + This will always issue a remote call to the cluster to fetch the latest + information. + + Returns: + set: topics + """ + self._fetch_all_topic_metadata() + return self._client.cluster.topics() def partitions_for_topic(self, topic): - """Get metadata about the partitions for a given topic. + """This method first checks the local metadata cache for information + about the topic. If the topic is not found (either because the topic + does not exist, the user is not authorized to view the topic, or the + metadata cache is not populated), then it will issue a metadata update + call to the cluster. Arguments: topic (str): Topic to check. @@ -578,7 +590,12 @@ class KafkaConsumer(six.Iterator): Returns: set: Partition ids """ - return self._client.cluster.partitions_for_topic(topic) + cluster = self._client.cluster + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + self._fetch_all_topic_metadata() + partitions = cluster.partitions_for_topic(topic) + return partitions def poll(self, timeout_ms=0, max_records=None): """Fetch data from assigned topics / partitions. |