summaryrefslogtreecommitdiff
path: root/kafka/admin/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r--kafka/admin/client.py70
1 files changed, 56 insertions, 14 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index df85f44..bb1e2b5 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -5,6 +5,7 @@ import copy
import logging
import socket
+from . import ConfigResourceType
from kafka.vendor import six
from kafka.client_async import KafkaClient, selectors
@@ -763,29 +764,70 @@ class KafkaAdminClient(object):
supported by all versions. Default: False.
:return: Appropriate version of DescribeConfigsResponse class.
"""
+
+ # Break up requests by type - a broker config request must be sent to the specific broker.
+ # All other (currently just topic resources) can be sent to any broker.
+ broker_resources = []
+ topic_resources = []
+
+ for config_resource in config_resources:
+ if config_resource.resource_type == ConfigResourceType.BROKER:
+ broker_resources.append(self._convert_describe_config_resource_request(config_resource))
+ else:
+ topic_resources.append(self._convert_describe_config_resource_request(config_resource))
+
+ futures = []
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
- .format(self.config['api_version']))
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
- )
+ .format(self.config['api_version']))
+
+ if len(broker_resources) > 0:
+ for broker_resource in broker_resources:
+ try:
+ broker_id = int(broker_resource[1])
+ except ValueError:
+ raise ValueError("Broker resource names must be an integer or a string represented integer")
+
+ futures.append(self._send_request_to_node(
+ broker_id,
+ DescribeConfigsRequest[version](resources=[broker_resource])
+ ))
+
+ if len(topic_resources) > 0:
+ futures.append(self._send_request_to_node(
+ self._client.least_loaded_node(),
+ DescribeConfigsRequest[version](resources=topic_resources)
+ ))
+
elif version == 1:
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
- include_synonyms=include_synonyms
- )
+ if len(broker_resources) > 0:
+ for broker_resource in broker_resources:
+ try:
+ broker_id = int(broker_resource[1])
+ except ValueError:
+ raise ValueError("Broker resource names must be an integer or a string represented integer")
+
+ futures.append(self._send_request_to_node(
+ broker_id,
+ DescribeConfigsRequest[version](
+ resources=[broker_resource],
+ include_synonyms=include_synonyms)
+ ))
+
+ if len(topic_resources) > 0:
+ futures.append(self._send_request_to_node(
+ self._client.least_loaded_node(),
+ DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
+ ))
else:
raise NotImplementedError(
- "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
- .format(version))
- future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
- self._wait_for_futures([future])
- response = future.value
- return response
+ self._wait_for_futures(futures)
+ return [f.value for f in futures]
@staticmethod
def _convert_alter_config_resource_request(config_resource):