summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/client.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 2cb5c57..454c5b3 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -473,7 +473,7 @@ class KafkaAdminClient(object):
return response
- def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
+ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False):
"""
topics == None means "get all topics"
"""
@@ -492,10 +492,13 @@ class KafkaAdminClient(object):
allow_auto_topic_creation=auto_topic_creation
)
- future = self._send_request_to_node(
- self._client.least_loaded_node(),
- request
- )
+ if use_controller:
+ future = self._send_request_to_controller(request)
+ else:
+ future = self._send_request_to_node(
+ self._client.least_loaded_node(),
+ request
+ )
self._wait_for_futures([future])
return future.value
@@ -505,7 +508,7 @@ class KafkaAdminClient(object):
return [t['topic'] for t in obj['topics']]
def describe_topics(self, topics=None):
- metadata = self._get_cluster_metadata(topics=topics)
+ metadata = self._get_cluster_metadata(topics=topics, use_controller=True)
obj = metadata.to_object()
return obj['topics']