From 80664a55bfafb243e89640986b8d53748b996df6 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 17 Sep 2020 10:34:41 -0700 Subject: Merge _find_coordinator_id methods Previously there were two methods: * `_find_coordinator_id()` * `_find_many_coordinator_ids()` But they do basically the same thing internally. And we need the plural two places, but the singular only one place. So merge them, and change the function signature to take a list of `group_ids` and return a dict of `group_id: coordinator_id`s. As a result of this, the `describe_groups()` command should scale better because the `_find_coordinator_ids()` command issues all the requests async, instead of sequentially blocking as the `described_groups()` used to do. --- kafka/admin/client.py | 69 ++++++++++++++++++++------------------------------- 1 file changed, 27 insertions(+), 42 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 1fcd88c..c58da0c 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -328,24 +328,8 @@ class KafkaAdminClient(object): .format(response.API_VERSION)) return response.coordinator_id - def _find_coordinator_id(self, group_id): - """Find the broker node_id of the coordinator of the given group. - - Sends a FindCoordinatorRequest message to the cluster. Will block until - the FindCoordinatorResponse is received. Any errors are immediately - raised. - - :param group_id: The consumer group ID. This is typically the group - name as a string. - :return: The node_id of the broker that is the coordinator. - """ - future = self._find_coordinator_id_send_request(group_id) - self._wait_for_futures([future]) - response = future.value - return self._find_coordinator_id_process_response(response) - - def _find_many_coordinator_ids(self, group_ids): - """Find the broker node_id of the coordinator for each of the given groups. + def _find_coordinator_ids(self, group_ids): + """Find the broker node_ids of the coordinators of the given groups. Sends a FindCoordinatorRequest message to the cluster for each group_id. Will block until the FindCoordinatorResponse is received for all groups. @@ -353,18 +337,18 @@ class KafkaAdminClient(object): :param group_ids: A list of consumer group IDs. This is typically the group name as a string. - :return: A list of tuples (group_id, node_id) where node_id is the id - of the broker that is the coordinator for the corresponding group. + :return: A dict of {group_id: node_id} where node_id is the id of the + broker that is the coordinator for the corresponding group. """ - futures = { + groups_futures = { group_id: self._find_coordinator_id_send_request(group_id) for group_id in group_ids } - self._wait_for_futures(list(futures.values())) - groups_coordinators = [ - (group_id, self._find_coordinator_id_process_response(f.value)) - for group_id, f in futures.items() - ] + self._wait_for_futures(groups_futures.values()) + groups_coordinators = { + group_id: self._find_coordinator_id_process_response(future.value) + for group_id, future in groups_futures.items() + } return groups_coordinators def _send_request_to_node(self, node_id, request): @@ -1094,18 +1078,19 @@ class KafkaAdminClient(object): partition assignments. """ group_descriptions = [] - futures = [] - for group_id in group_ids: - if group_coordinator_id is not None: - this_groups_coordinator_id = group_coordinator_id - else: - this_groups_coordinator_id = self._find_coordinator_id(group_id) - f = self._describe_consumer_groups_send_request( + + if group_coordinator_id is not None: + groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids} + else: + groups_coordinators = self._find_coordinator_ids(group_ids) + + futures = [ + self._describe_consumer_groups_send_request( group_id, - this_groups_coordinator_id, + coordinator_id, include_authorized_operations) - futures.append(f) - + for group_id, coordinator_id in groups_coordinators.items() + ] self._wait_for_futures(futures) for future in futures: @@ -1277,7 +1262,7 @@ class KafkaAdminClient(object): explicitly specified. """ if group_coordinator_id is None: - group_coordinator_id = self._find_coordinator_id(group_id) + group_coordinator_id = self._find_coordinator_ids([group_id])[group_id] future = self._list_consumer_group_offsets_send_request( group_id, group_coordinator_id, partitions) self._wait_for_futures([future]) @@ -1305,12 +1290,12 @@ class KafkaAdminClient(object): if group_coordinator_id is not None: futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)] else: - groups_coordinators = defaultdict(list) - for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids): - groups_coordinators[group_coordinator_id].append(group_id) + coordinators_groups = defaultdict(list) + for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items(): + coordinators_groups[coordinator_id].append(group_id) futures = [ - self._delete_consumer_groups_send_request(group_ids, group_coordinator_id) - for group_coordinator_id, group_ids in groups_coordinators.items() + self._delete_consumer_groups_send_request(group_ids, coordinator_id) + for coordinator_id, group_ids in coordinators_groups.items() ] self._wait_for_futures(futures) -- cgit v1.2.1