summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrian Sang <sang.bri@gmail.com>2019-05-22 23:54:14 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-05-22 23:54:14 -0700
commit1f73287e890a4c68f240dcc8b6966de1e62b65cc (patch)
tree6e9f76a1412598475a60e0c3101066566542faf5
parentedfafc036f0d9a3b1e5c73f9642ef71c297c1b64 (diff)
downloadkafka-python-1f73287e890a4c68f240dcc8b6966de1e62b65cc.tar.gz
Make partitions_for_topic a read-through cache (#1781)
If the cluster metadata object has no info about the topic, then issue a blocking metadata call to fetch it.
-rw-r--r--kafka/consumer/group.py33
1 files changed, 25 insertions, 8 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 6270407..39a4e08 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -552,11 +552,9 @@ class KafkaConsumer(six.Iterator):
committed = None
return committed
- def topics(self):
- """Get all topics the user is authorized to view.
-
- Returns:
- set: topics
+ def _fetch_all_topit_metadata(self):
+ """A blocking call that fetches topic metadata for all topics in the
+ cluster that the user is authorized to view.
"""
cluster = self._client.cluster
if self._client._metadata_refresh_in_progress and self._client._topics:
@@ -567,10 +565,24 @@ class KafkaConsumer(six.Iterator):
future = cluster.request_update()
self._client.poll(future=future)
cluster.need_all_topic_metadata = stash
- return cluster.topics()
+
+ def topics(self):
+ """Get all topics the user is authorized to view.
+ This will always issue a remote call to the cluster to fetch the latest
+ information.
+
+ Returns:
+ set: topics
+ """
+ self._fetch_all_topic_metadata()
+ return self._client.cluster.topics()
def partitions_for_topic(self, topic):
- """Get metadata about the partitions for a given topic.
+ """This method first checks the local metadata cache for information
+ about the topic. If the topic is not found (either because the topic
+ does not exist, the user is not authorized to view the topic, or the
+ metadata cache is not populated), then it will issue a metadata update
+ call to the cluster.
Arguments:
topic (str): Topic to check.
@@ -578,7 +590,12 @@ class KafkaConsumer(six.Iterator):
Returns:
set: Partition ids
"""
- return self._client.cluster.partitions_for_topic(topic)
+ cluster = self._client.cluster
+ partitions = cluster.partitions_for_topic(topic)
+ if partitions is None:
+ self._fetch_all_topic_metadata()
+ partitions = cluster.partitions_for_topic(topic)
+ return partitions
def poll(self, timeout_ms=0, max_records=None):
"""Fetch data from assigned topics / partitions.