diff options
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r-- | kafka/admin/client.py | 46 |
1 files changed, 40 insertions, 6 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index e9be1d8..2cb5c57 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -472,14 +472,48 @@ class KafkaAdminClient(object): .format(version)) return response - # list topics functionality is in ClusterMetadata - # Note: if implemented here, send the request to the least_loaded_node() - # describe topics functionality is in ClusterMetadata - # Note: if implemented here, send the request to the controller + def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): + """ + topics == None means "get all topics" + """ + version = self._matching_api_version(MetadataRequest) + if version <= 3: + if auto_topic_creation: + raise IncompatibleBrokerVersion( + "auto_topic_creation requires MetadataRequest >= v4, which" + " is not supported by Kafka {}" + .format(self.config['api_version'])) - # describe cluster functionality is in ClusterMetadata - # Note: if implemented here, send the request to the least_loaded_node() + request = MetadataRequest[version](topics=topics) + elif version <= 5: + request = MetadataRequest[version]( + topics=topics, + allow_auto_topic_creation=auto_topic_creation + ) + + future = self._send_request_to_node( + self._client.least_loaded_node(), + request + ) + self._wait_for_futures([future]) + return future.value + + def list_topics(self): + metadata = self._get_cluster_metadata(topics=None) + obj = metadata.to_object() + return [t['topic'] for t in obj['topics']] + + def describe_topics(self, topics=None): + metadata = self._get_cluster_metadata(topics=topics) + obj = metadata.to_object() + return obj['topics'] + + def describe_cluster(self): + metadata = self._get_cluster_metadata() + obj = metadata.to_object() + obj.pop('topics') # We have 'describe_topics' for this + return obj @staticmethod def _convert_describe_acls_response_to_acls(describe_response): |