diff options
author | Tyler Lubeck <tylerl@surveymonkey.com> | 2019-11-15 10:54:48 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2020-02-06 13:10:08 -0800 |
commit | da01fef9b9685cc95fe1a8dd420e841b9c63e8c3 (patch) | |
tree | 75019ac7a07953d77f8fff9f2bc93b10e522f5a9 | |
parent | 209515bf9dcdd9e03bc286035641af3ae72fcbf9 (diff) | |
download | kafka-python-da01fef9b9685cc95fe1a8dd420e841b9c63e8c3.tar.gz |
Implement list_topics, describe_topics, and describe_cluster
-rw-r--r-- | kafka/admin/client.py | 46 |
1 files changed, 40 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index e9be1d8..2cb5c57 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -472,14 +472,48 @@ class KafkaAdminClient(object): .format(version)) return response - # list topics functionality is in ClusterMetadata - # Note: if implemented here, send the request to the least_loaded_node() - # describe topics functionality is in ClusterMetadata - # Note: if implemented here, send the request to the controller + def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): + """ + topics == None means "get all topics" + """ + version = self._matching_api_version(MetadataRequest) + if version <= 3: + if auto_topic_creation: + raise IncompatibleBrokerVersion( + "auto_topic_creation requires MetadataRequest >= v4, which" + " is not supported by Kafka {}" + .format(self.config['api_version'])) - # describe cluster functionality is in ClusterMetadata - # Note: if implemented here, send the request to the least_loaded_node() + request = MetadataRequest[version](topics=topics) + elif version <= 5: + request = MetadataRequest[version]( + topics=topics, + allow_auto_topic_creation=auto_topic_creation + ) + + future = self._send_request_to_node( + self._client.least_loaded_node(), + request + ) + self._wait_for_futures([future]) + return future.value + + def list_topics(self): + metadata = self._get_cluster_metadata(topics=None) + obj = metadata.to_object() + return [t['topic'] for t in obj['topics']] + + def describe_topics(self, topics=None): + metadata = self._get_cluster_metadata(topics=topics) + obj = metadata.to_object() + return obj['topics'] + + def describe_cluster(self): + metadata = self._get_cluster_metadata() + obj = metadata.to_object() + obj.pop('topics') # We have 'describe_topics' for this + return obj @staticmethod def _convert_describe_acls_response_to_acls(describe_response): |