summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2020-02-16 10:11:32 -0800
committerJeff Widman <jeff@jeffwidman.com>2020-02-16 10:15:31 -0800
commit3b9849762e63a6a3977fcb3603edef8ccef2d5a2 (patch)
treed0ef3c1d3e4985c8fb29d1eec2c5682ed1bbd2f9
parent7195f0369c7dbe25aea2c3fed78d2b4f772d775b (diff)
downloadkafka-python-KAFKA-8962-use-least-loaded-node-for-admin-describe-topics.tar.gz
KAFKA-8962: Use least_loaded_node() for describe_topics()KAFKA-8962-use-least-loaded-node-for-admin-describe-topics
In KAFKA-8962 the `AdminClient.describe_topics()` call was changed from using the controller to using the `least_loaded_node()`. WIP MORE about these reversions This reverts commit 7195f0369c7dbe25aea2c3fed78d2b4f772d775b. This reverts commit 6e2978edee9a06e9dbe60afcac226b27b83cbc74. This reverts commit f92889af79db08ef26d89cb18bd48c7dd5080010.
-rw-r--r--kafka/admin/client.py22
1 files changed, 7 insertions, 15 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 7c1bd4f..d0fa845 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -376,16 +376,11 @@ class KafkaAdminClient(object):
# In Java, the error field name is inconsistent:
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
# - DeleteTopicsResponse uses topic_error_codes
- # - MetadataResponse uses topics[].error_code
- topic_error_tuples = []
- if hasattr(response, 'topic_errors'):
- topic_error_tuples.extend(response.topic_errors)
- elif hasattr(response, 'topic_error_codes'):
- topic_error_tuples.extend(response.topic_error_codes)
- elif hasattr(response, 'topics'):
- for topic in response.topics:
- if hasattr(topic, 'topic') and hasattr(topic, 'error_code'):
- topic_error_tuples.append((topic.topic, topic.error_code))
+ # So this is a little brittle in that it assumes all responses have
+ # one of these attributes and that they always unpack into
+ # (topic, error_code) tuples.
+ topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
+ else response.topic_error_codes)
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
@@ -478,7 +473,7 @@ class KafkaAdminClient(object):
return response
- def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False):
+ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
@@ -497,9 +492,6 @@ class KafkaAdminClient(object):
allow_auto_topic_creation=auto_topic_creation
)
- if use_controller:
- return self._send_request_to_controller(request)
-
future = self._send_request_to_node(
self._client.least_loaded_node(),
request
@@ -513,7 +505,7 @@ class KafkaAdminClient(object):
return [t['topic'] for t in obj['topics']]
def describe_topics(self, topics=None):
- metadata = self._get_cluster_metadata(topics=topics, use_controller=True)
+ metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']