summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyler Lubeck <tylerl@surveymonkey.com>2019-11-15 10:54:48 -0800
committerJeff Widman <jeff@jeffwidman.com>2020-02-06 13:10:08 -0800
commitda01fef9b9685cc95fe1a8dd420e841b9c63e8c3 (patch)
tree75019ac7a07953d77f8fff9f2bc93b10e522f5a9
parent209515bf9dcdd9e03bc286035641af3ae72fcbf9 (diff)
downloadkafka-python-da01fef9b9685cc95fe1a8dd420e841b9c63e8c3.tar.gz
Implement list_topics, describe_topics, and describe_cluster
-rw-r--r--kafka/admin/client.py46
1 files changed, 40 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index e9be1d8..2cb5c57 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -472,14 +472,48 @@ class KafkaAdminClient(object):
.format(version))
return response
- # list topics functionality is in ClusterMetadata
- # Note: if implemented here, send the request to the least_loaded_node()
- # describe topics functionality is in ClusterMetadata
- # Note: if implemented here, send the request to the controller
+ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
+ """
+ topics == None means "get all topics"
+ """
+ version = self._matching_api_version(MetadataRequest)
+ if version <= 3:
+ if auto_topic_creation:
+ raise IncompatibleBrokerVersion(
+ "auto_topic_creation requires MetadataRequest >= v4, which"
+ " is not supported by Kafka {}"
+ .format(self.config['api_version']))
- # describe cluster functionality is in ClusterMetadata
- # Note: if implemented here, send the request to the least_loaded_node()
+ request = MetadataRequest[version](topics=topics)
+ elif version <= 5:
+ request = MetadataRequest[version](
+ topics=topics,
+ allow_auto_topic_creation=auto_topic_creation
+ )
+
+ future = self._send_request_to_node(
+ self._client.least_loaded_node(),
+ request
+ )
+ self._wait_for_futures([future])
+ return future.value
+
+ def list_topics(self):
+ metadata = self._get_cluster_metadata(topics=None)
+ obj = metadata.to_object()
+ return [t['topic'] for t in obj['topics']]
+
+ def describe_topics(self, topics=None):
+ metadata = self._get_cluster_metadata(topics=topics)
+ obj = metadata.to_object()
+ return obj['topics']
+
+ def describe_cluster(self):
+ metadata = self._get_cluster_metadata()
+ obj = metadata.to_object()
+ obj.pop('topics') # We have 'describe_topics' for this
+ return obj
@staticmethod
def _convert_describe_acls_response_to_acls(describe_response):