summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeppe Andersen <2197398+jlandersen@users.noreply.github.com>2019-10-11 20:46:52 +0200
committerJeff Widman <jeff@jeffwidman.com>2019-10-11 11:46:52 -0700
commit6d3800ca9f45fd953689a1787fc90a5e566e34ea (patch)
treef47705bfa7ba965a1e505cb3714116eb36771e20
parent84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1 (diff)
downloadkafka-python-6d3800ca9f45fd953689a1787fc90a5e566e34ea.tar.gz
Fix describe config for multi-broker clusters (#1869)
* Fix describe config for multi-broker clusters Currently all describe config requests are sent to "least loaded node". Requests for broker configs must, however, be sent to the specific broker, otherwise an error is returned. Only topic requests can be handled by any node. This changes the logic to send all describe config requests to the specific broker.
-rw-r--r--kafka/admin/client.py70
-rw-r--r--test/test_admin_integration.py57
2 files changed, 112 insertions, 15 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index df85f44..bb1e2b5 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -5,6 +5,7 @@ import copy
import logging
import socket
+from . import ConfigResourceType
from kafka.vendor import six
from kafka.client_async import KafkaClient, selectors
@@ -763,29 +764,70 @@ class KafkaAdminClient(object):
supported by all versions. Default: False.
:return: Appropriate version of DescribeConfigsResponse class.
"""
+
+ # Break up requests by type - a broker config request must be sent to the specific broker.
+ # All other (currently just topic resources) can be sent to any broker.
+ broker_resources = []
+ topic_resources = []
+
+ for config_resource in config_resources:
+ if config_resource.resource_type == ConfigResourceType.BROKER:
+ broker_resources.append(self._convert_describe_config_resource_request(config_resource))
+ else:
+ topic_resources.append(self._convert_describe_config_resource_request(config_resource))
+
+ futures = []
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
- .format(self.config['api_version']))
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
- )
+ .format(self.config['api_version']))
+
+ if len(broker_resources) > 0:
+ for broker_resource in broker_resources:
+ try:
+ broker_id = int(broker_resource[1])
+ except ValueError:
+ raise ValueError("Broker resource names must be an integer or a string represented integer")
+
+ futures.append(self._send_request_to_node(
+ broker_id,
+ DescribeConfigsRequest[version](resources=[broker_resource])
+ ))
+
+ if len(topic_resources) > 0:
+ futures.append(self._send_request_to_node(
+ self._client.least_loaded_node(),
+ DescribeConfigsRequest[version](resources=topic_resources)
+ ))
+
elif version == 1:
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
- include_synonyms=include_synonyms
- )
+ if len(broker_resources) > 0:
+ for broker_resource in broker_resources:
+ try:
+ broker_id = int(broker_resource[1])
+ except ValueError:
+ raise ValueError("Broker resource names must be an integer or a string represented integer")
+
+ futures.append(self._send_request_to_node(
+ broker_id,
+ DescribeConfigsRequest[version](
+ resources=[broker_resource],
+ include_synonyms=include_synonyms)
+ ))
+
+ if len(topic_resources) > 0:
+ futures.append(self._send_request_to_node(
+ self._client.least_loaded_node(),
+ DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
+ ))
else:
raise NotImplementedError(
- "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
- .format(version))
- future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
- self._wait_for_futures([future])
- response = future.value
- return response
+ self._wait_for_futures(futures)
+ return [f.value for f in futures]
@staticmethod
def _convert_alter_config_resource_request(config_resource):
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index 3efa021..0b041b2 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -3,7 +3,8 @@ import pytest
from test.testutil import env_kafka_version
from kafka.errors import NoError
-from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+from kafka.admin import (
+ ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -80,3 +81,57 @@ def test_create_describe_delete_acls(kafka_admin_client):
assert error is NoError
assert len(acls) == 0
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_broker_resource_returns_configs(kafka_admin_client):
+ """Tests that describe config returns configs for broker
+ """
+ broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+ assert len(configs) == 1
+ assert configs[0].resources[0][2] == ConfigResourceType.BROKER
+ assert configs[0].resources[0][3] == str(broker_id)
+ assert len(configs[0].resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client):
+ """Tests that describe config returns configs for topic
+ """
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)])
+
+ assert len(configs) == 1
+ assert configs[0].resources[0][2] == ConfigResourceType.TOPIC
+ assert configs[0].resources[0][3] == topic
+ assert len(configs[0].resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client):
+ """Tests that describe config returns configs for mixed resource types (topic + broker)
+ """
+ broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
+ configs = kafka_admin_client.describe_configs([
+ ConfigResource(ConfigResourceType.TOPIC, topic),
+ ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+ assert len(configs) == 2
+
+ for config in configs:
+ assert (config.resources[0][2] == ConfigResourceType.TOPIC
+ and config.resources[0][3] == topic) or \
+ (config.resources[0][2] == ConfigResourceType.BROKER
+ and config.resources[0][3] == str(broker_id))
+ assert len(config.resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
+ """Tests that describe config raises exception on non-integer broker id
+ """
+ broker_id = "str"
+
+ with pytest.raises(ValueError):
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])