summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2020-02-16 10:11:32 -0800
committerJeff Widman <jeff@jeffwidman.com>2020-02-16 14:54:06 -0800
commit3c3fdc11b555756c53ae63bfa46a915eab48f448 (patch)
treed0ef3c1d3e4985c8fb29d1eec2c5682ed1bbd2f9
parent7195f0369c7dbe25aea2c3fed78d2b4f772d775b (diff)
downloadkafka-python-3c3fdc11b555756c53ae63bfa46a915eab48f448.tar.gz
KAFKA-8962: Use least_loaded_node() for describe_topics()
In KAFKA-8962 the `AdminClient.describe_topics()` call was changed from using the controller to using the `least_loaded_node()`: https://github.com/apache/kafka/commit/317089663cc7ff4fdfcba6ee434f455e8ae13acd#diff-6869b8fccf6b098cbcb0676e8ceb26a7R1540 As a result, no metadata request/response processing needs to happen through the controller, so it's safe to remove the custom error-checking. Besides, I don't think this error-checking even added any value because AFAIK no metadata response would return a `NotControllerError` because the recipient broker wouldn't realize the metadata request was intended for only the controller. Originally our admin client was implemented using the least-loaded-node, then later updated to the controller. So updating it back to least-loaded node is a simple case of reverting the associated commits. This reverts commit 7195f0369c7dbe25aea2c3fed78d2b4f772d775b. This reverts commit 6e2978edee9a06e9dbe60afcac226b27b83cbc74. This reverts commit f92889af79db08ef26d89cb18bd48c7dd5080010.
-rw-r--r--kafka/admin/client.py22
1 files changed, 7 insertions, 15 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 7c1bd4f..d0fa845 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -376,16 +376,11 @@ class KafkaAdminClient(object):
# In Java, the error field name is inconsistent:
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
# - DeleteTopicsResponse uses topic_error_codes
- # - MetadataResponse uses topics[].error_code
- topic_error_tuples = []
- if hasattr(response, 'topic_errors'):
- topic_error_tuples.extend(response.topic_errors)
- elif hasattr(response, 'topic_error_codes'):
- topic_error_tuples.extend(response.topic_error_codes)
- elif hasattr(response, 'topics'):
- for topic in response.topics:
- if hasattr(topic, 'topic') and hasattr(topic, 'error_code'):
- topic_error_tuples.append((topic.topic, topic.error_code))
+ # So this is a little brittle in that it assumes all responses have
+ # one of these attributes and that they always unpack into
+ # (topic, error_code) tuples.
+ topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
+ else response.topic_error_codes)
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
@@ -478,7 +473,7 @@ class KafkaAdminClient(object):
return response
- def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False):
+ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
@@ -497,9 +492,6 @@ class KafkaAdminClient(object):
allow_auto_topic_creation=auto_topic_creation
)
- if use_controller:
- return self._send_request_to_controller(request)
-
future = self._send_request_to_node(
self._client.least_loaded_node(),
request
@@ -513,7 +505,7 @@ class KafkaAdminClient(object):
return [t['topic'] for t in obj['topics']]
def describe_topics(self, topics=None):
- metadata = self._get_cluster_metadata(topics=topics, use_controller=True)
+ metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']