summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyler Lubeck <tyler@coffeemeetsbagel.com>2020-02-06 13:31:05 -0800
committerJeff Widman <jeff@jeffwidman.com>2020-02-06 14:20:40 -0800
commitf92889af79db08ef26d89cb18bd48c7dd5080010 (patch)
tree0f51c3ea3f28391088fd779fbae12d05ad7e38b2
parentda01fef9b9685cc95fe1a8dd420e841b9c63e8c3 (diff)
downloadkafka-python-f92889af79db08ef26d89cb18bd48c7dd5080010.tar.gz
Use the controller for topic metadata requests
Closes #1994
-rw-r--r--kafka/admin/client.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 2cb5c57..454c5b3 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -473,7 +473,7 @@ class KafkaAdminClient(object):
return response
- def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
+ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False):
"""
topics == None means "get all topics"
"""
@@ -492,10 +492,13 @@ class KafkaAdminClient(object):
allow_auto_topic_creation=auto_topic_creation
)
- future = self._send_request_to_node(
- self._client.least_loaded_node(),
- request
- )
+ if use_controller:
+ future = self._send_request_to_controller(request)
+ else:
+ future = self._send_request_to_node(
+ self._client.least_loaded_node(),
+ request
+ )
self._wait_for_futures([future])
return future.value
@@ -505,7 +508,7 @@ class KafkaAdminClient(object):
return [t['topic'] for t in obj['topics']]
def describe_topics(self, topics=None):
- metadata = self._get_cluster_metadata(topics=topics)
+ metadata = self._get_cluster_metadata(topics=topics, use_controller=True)
obj = metadata.to_object()
return obj['topics']