diff options
-rw-r--r-- | kafka/admin/client.py | 69 |
1 files changed, 27 insertions, 42 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 1fcd88c..c58da0c 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -328,24 +328,8 @@ class KafkaAdminClient(object): .format(response.API_VERSION)) return response.coordinator_id - def _find_coordinator_id(self, group_id): - """Find the broker node_id of the coordinator of the given group. - - Sends a FindCoordinatorRequest message to the cluster. Will block until - the FindCoordinatorResponse is received. Any errors are immediately - raised. - - :param group_id: The consumer group ID. This is typically the group - name as a string. - :return: The node_id of the broker that is the coordinator. - """ - future = self._find_coordinator_id_send_request(group_id) - self._wait_for_futures([future]) - response = future.value - return self._find_coordinator_id_process_response(response) - - def _find_many_coordinator_ids(self, group_ids): - """Find the broker node_id of the coordinator for each of the given groups. + def _find_coordinator_ids(self, group_ids): + """Find the broker node_ids of the coordinators of the given groups. Sends a FindCoordinatorRequest message to the cluster for each group_id. Will block until the FindCoordinatorResponse is received for all groups. @@ -353,18 +337,18 @@ class KafkaAdminClient(object): :param group_ids: A list of consumer group IDs. This is typically the group name as a string. - :return: A list of tuples (group_id, node_id) where node_id is the id - of the broker that is the coordinator for the corresponding group. + :return: A dict of {group_id: node_id} where node_id is the id of the + broker that is the coordinator for the corresponding group. """ - futures = { + groups_futures = { group_id: self._find_coordinator_id_send_request(group_id) for group_id in group_ids } - self._wait_for_futures(list(futures.values())) - groups_coordinators = [ - (group_id, self._find_coordinator_id_process_response(f.value)) - for group_id, f in futures.items() - ] + self._wait_for_futures(groups_futures.values()) + groups_coordinators = { + group_id: self._find_coordinator_id_process_response(future.value) + for group_id, future in groups_futures.items() + } return groups_coordinators def _send_request_to_node(self, node_id, request): @@ -1094,18 +1078,19 @@ class KafkaAdminClient(object): partition assignments. """ group_descriptions = [] - futures = [] - for group_id in group_ids: - if group_coordinator_id is not None: - this_groups_coordinator_id = group_coordinator_id - else: - this_groups_coordinator_id = self._find_coordinator_id(group_id) - f = self._describe_consumer_groups_send_request( + + if group_coordinator_id is not None: + groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids} + else: + groups_coordinators = self._find_coordinator_ids(group_ids) + + futures = [ + self._describe_consumer_groups_send_request( group_id, - this_groups_coordinator_id, + coordinator_id, include_authorized_operations) - futures.append(f) - + for group_id, coordinator_id in groups_coordinators.items() + ] self._wait_for_futures(futures) for future in futures: @@ -1277,7 +1262,7 @@ class KafkaAdminClient(object): explicitly specified. """ if group_coordinator_id is None: - group_coordinator_id = self._find_coordinator_id(group_id) + group_coordinator_id = self._find_coordinator_ids([group_id])[group_id] future = self._list_consumer_group_offsets_send_request( group_id, group_coordinator_id, partitions) self._wait_for_futures([future]) @@ -1305,12 +1290,12 @@ class KafkaAdminClient(object): if group_coordinator_id is not None: futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)] else: - groups_coordinators = defaultdict(list) - for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids): - groups_coordinators[group_coordinator_id].append(group_id) + coordinators_groups = defaultdict(list) + for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items(): + coordinators_groups[coordinator_id].append(group_id) futures = [ - self._delete_consumer_groups_send_request(group_ids, group_coordinator_id) - for group_coordinator_id, group_ids in groups_coordinators.items() + self._delete_consumer_groups_send_request(group_ids, coordinator_id) + for coordinator_id, group_ids in coordinators_groups.items() ] self._wait_for_futures(futures) |