diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2020-02-16 10:11:32 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2020-02-16 14:54:06 -0800 |
commit | 3c3fdc11b555756c53ae63bfa46a915eab48f448 (patch) | |
tree | d0ef3c1d3e4985c8fb29d1eec2c5682ed1bbd2f9 | |
parent | 7195f0369c7dbe25aea2c3fed78d2b4f772d775b (diff) | |
download | kafka-python-3c3fdc11b555756c53ae63bfa46a915eab48f448.tar.gz |
KAFKA-8962: Use least_loaded_node() for describe_topics()
In KAFKA-8962 the `AdminClient.describe_topics()` call was changed from
using the controller to using the `least_loaded_node()`:
https://github.com/apache/kafka/commit/317089663cc7ff4fdfcba6ee434f455e8ae13acd#diff-6869b8fccf6b098cbcb0676e8ceb26a7R1540
As a result, no metadata request/response processing needs to happen
through the controller, so it's safe to remove the custom
error-checking. Besides, I don't think this error-checking even added
any value because AFAIK no metadata response would return a
`NotControllerError` because the recipient broker wouldn't realize the
metadata request was intended for only the controller.
Originally our admin client was implemented using the least-loaded-node,
then later updated to the controller. So updating it back to
least-loaded node is a simple case of reverting the associated commits.
This reverts commit 7195f0369c7dbe25aea2c3fed78d2b4f772d775b.
This reverts commit 6e2978edee9a06e9dbe60afcac226b27b83cbc74.
This reverts commit f92889af79db08ef26d89cb18bd48c7dd5080010.
-rw-r--r-- | kafka/admin/client.py | 22 |
1 files changed, 7 insertions, 15 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 7c1bd4f..d0fa845 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -376,16 +376,11 @@ class KafkaAdminClient(object): # In Java, the error field name is inconsistent: # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors # - DeleteTopicsResponse uses topic_error_codes - # - MetadataResponse uses topics[].error_code - topic_error_tuples = [] - if hasattr(response, 'topic_errors'): - topic_error_tuples.extend(response.topic_errors) - elif hasattr(response, 'topic_error_codes'): - topic_error_tuples.extend(response.topic_error_codes) - elif hasattr(response, 'topics'): - for topic in response.topics: - if hasattr(topic, 'topic') and hasattr(topic, 'error_code'): - topic_error_tuples.append((topic.topic, topic.error_code)) + # So this is a little brittle in that it assumes all responses have + # one of these attributes and that they always unpack into + # (topic, error_code) tuples. + topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors') + else response.topic_error_codes) # Also small py2/py3 compatibility -- py3 can ignore extra values # during unpack via: for x, y, *rest in list_of_values. py2 cannot. # So for now we have to map across the list and explicitly drop any @@ -478,7 +473,7 @@ class KafkaAdminClient(object): return response - def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False): + def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): """ topics == None means "get all topics" """ @@ -497,9 +492,6 @@ class KafkaAdminClient(object): allow_auto_topic_creation=auto_topic_creation ) - if use_controller: - return self._send_request_to_controller(request) - future = self._send_request_to_node( self._client.least_loaded_node(), request @@ -513,7 +505,7 @@ class KafkaAdminClient(object): return [t['topic'] for t in obj['topics']] def describe_topics(self, topics=None): - metadata = self._get_cluster_metadata(topics=topics, use_controller=True) + metadata = self._get_cluster_metadata(topics=topics) obj = metadata.to_object() return obj['topics'] |