diff options
author | Tyler Lubeck <tyler@coffeemeetsbagel.com> | 2020-02-06 13:31:05 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2020-02-06 14:20:40 -0800 |
commit | f92889af79db08ef26d89cb18bd48c7dd5080010 (patch) | |
tree | 0f51c3ea3f28391088fd779fbae12d05ad7e38b2 | |
parent | da01fef9b9685cc95fe1a8dd420e841b9c63e8c3 (diff) | |
download | kafka-python-f92889af79db08ef26d89cb18bd48c7dd5080010.tar.gz |
Use the controller for topic metadata requests
Closes #1994
-rw-r--r-- | kafka/admin/client.py | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 2cb5c57..454c5b3 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -473,7 +473,7 @@ class KafkaAdminClient(object): return response - def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): + def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False): """ topics == None means "get all topics" """ @@ -492,10 +492,13 @@ class KafkaAdminClient(object): allow_auto_topic_creation=auto_topic_creation ) - future = self._send_request_to_node( - self._client.least_loaded_node(), - request - ) + if use_controller: + future = self._send_request_to_controller(request) + else: + future = self._send_request_to_node( + self._client.least_loaded_node(), + request + ) self._wait_for_futures([future]) return future.value @@ -505,7 +508,7 @@ class KafkaAdminClient(object): return [t['topic'] for t in obj['topics']] def describe_topics(self, topics=None): - metadata = self._get_cluster_metadata(topics=topics) + metadata = self._get_cluster_metadata(topics=topics, use_controller=True) obj = metadata.to_object() return obj['topics'] |