summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/client.py69
1 files changed, 27 insertions, 42 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 1fcd88c..c58da0c 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -328,24 +328,8 @@ class KafkaAdminClient(object):
.format(response.API_VERSION))
return response.coordinator_id
- def _find_coordinator_id(self, group_id):
- """Find the broker node_id of the coordinator of the given group.
-
- Sends a FindCoordinatorRequest message to the cluster. Will block until
- the FindCoordinatorResponse is received. Any errors are immediately
- raised.
-
- :param group_id: The consumer group ID. This is typically the group
- name as a string.
- :return: The node_id of the broker that is the coordinator.
- """
- future = self._find_coordinator_id_send_request(group_id)
- self._wait_for_futures([future])
- response = future.value
- return self._find_coordinator_id_process_response(response)
-
- def _find_many_coordinator_ids(self, group_ids):
- """Find the broker node_id of the coordinator for each of the given groups.
+ def _find_coordinator_ids(self, group_ids):
+ """Find the broker node_ids of the coordinators of the given groups.
Sends a FindCoordinatorRequest message to the cluster for each group_id.
Will block until the FindCoordinatorResponse is received for all groups.
@@ -353,18 +337,18 @@ class KafkaAdminClient(object):
:param group_ids: A list of consumer group IDs. This is typically the group
name as a string.
- :return: A list of tuples (group_id, node_id) where node_id is the id
- of the broker that is the coordinator for the corresponding group.
+ :return: A dict of {group_id: node_id} where node_id is the id of the
+ broker that is the coordinator for the corresponding group.
"""
- futures = {
+ groups_futures = {
group_id: self._find_coordinator_id_send_request(group_id)
for group_id in group_ids
}
- self._wait_for_futures(list(futures.values()))
- groups_coordinators = [
- (group_id, self._find_coordinator_id_process_response(f.value))
- for group_id, f in futures.items()
- ]
+ self._wait_for_futures(groups_futures.values())
+ groups_coordinators = {
+ group_id: self._find_coordinator_id_process_response(future.value)
+ for group_id, future in groups_futures.items()
+ }
return groups_coordinators
def _send_request_to_node(self, node_id, request):
@@ -1094,18 +1078,19 @@ class KafkaAdminClient(object):
partition assignments.
"""
group_descriptions = []
- futures = []
- for group_id in group_ids:
- if group_coordinator_id is not None:
- this_groups_coordinator_id = group_coordinator_id
- else:
- this_groups_coordinator_id = self._find_coordinator_id(group_id)
- f = self._describe_consumer_groups_send_request(
+
+ if group_coordinator_id is not None:
+ groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids}
+ else:
+ groups_coordinators = self._find_coordinator_ids(group_ids)
+
+ futures = [
+ self._describe_consumer_groups_send_request(
group_id,
- this_groups_coordinator_id,
+ coordinator_id,
include_authorized_operations)
- futures.append(f)
-
+ for group_id, coordinator_id in groups_coordinators.items()
+ ]
self._wait_for_futures(futures)
for future in futures:
@@ -1277,7 +1262,7 @@ class KafkaAdminClient(object):
explicitly specified.
"""
if group_coordinator_id is None:
- group_coordinator_id = self._find_coordinator_id(group_id)
+ group_coordinator_id = self._find_coordinator_ids([group_id])[group_id]
future = self._list_consumer_group_offsets_send_request(
group_id, group_coordinator_id, partitions)
self._wait_for_futures([future])
@@ -1305,12 +1290,12 @@ class KafkaAdminClient(object):
if group_coordinator_id is not None:
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
else:
- groups_coordinators = defaultdict(list)
- for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
- groups_coordinators[group_coordinator_id].append(group_id)
+ coordinators_groups = defaultdict(list)
+ for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items():
+ coordinators_groups[coordinator_id].append(group_id)
futures = [
- self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
- for group_coordinator_id, group_ids in groups_coordinators.items()
+ self._delete_consumer_groups_send_request(group_ids, coordinator_id)
+ for coordinator_id, group_ids in coordinators_groups.items()
]
self._wait_for_futures(futures)