summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2020-09-17 10:34:41 -0700
committerJeff Widman <jeff@jeffwidman.com>2020-09-17 11:16:21 -0700
commit80664a55bfafb243e89640986b8d53748b996df6 (patch)
tree3df34d5cd0dac2dfd0df52e07f65ca6686c35b6e
parent6cfe706d1ab4eaa7c970f19ce102f65625affb96 (diff)
downloadkafka-python-merge-_find_coordinator_ids-methods.tar.gz
Merge _find_coordinator_id methodsmerge-_find_coordinator_ids-methods
Previously there were two methods: * `_find_coordinator_id()` * `_find_many_coordinator_ids()` But they do basically the same thing internally. And we need the plural two places, but the singular only one place. So merge them, and change the function signature to take a list of `group_ids` and return a dict of `group_id: coordinator_id`s. As a result of this, the `describe_groups()` command should scale better because the `_find_coordinator_ids()` command issues all the requests async, instead of sequentially blocking as the `described_groups()` used to do.
-rw-r--r--kafka/admin/client.py69
1 files changed, 27 insertions, 42 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 1fcd88c..c58da0c 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -328,24 +328,8 @@ class KafkaAdminClient(object):
.format(response.API_VERSION))
return response.coordinator_id
- def _find_coordinator_id(self, group_id):
- """Find the broker node_id of the coordinator of the given group.
-
- Sends a FindCoordinatorRequest message to the cluster. Will block until
- the FindCoordinatorResponse is received. Any errors are immediately
- raised.
-
- :param group_id: The consumer group ID. This is typically the group
- name as a string.
- :return: The node_id of the broker that is the coordinator.
- """
- future = self._find_coordinator_id_send_request(group_id)
- self._wait_for_futures([future])
- response = future.value
- return self._find_coordinator_id_process_response(response)
-
- def _find_many_coordinator_ids(self, group_ids):
- """Find the broker node_id of the coordinator for each of the given groups.
+ def _find_coordinator_ids(self, group_ids):
+ """Find the broker node_ids of the coordinators of the given groups.
Sends a FindCoordinatorRequest message to the cluster for each group_id.
Will block until the FindCoordinatorResponse is received for all groups.
@@ -353,18 +337,18 @@ class KafkaAdminClient(object):
:param group_ids: A list of consumer group IDs. This is typically the group
name as a string.
- :return: A list of tuples (group_id, node_id) where node_id is the id
- of the broker that is the coordinator for the corresponding group.
+ :return: A dict of {group_id: node_id} where node_id is the id of the
+ broker that is the coordinator for the corresponding group.
"""
- futures = {
+ groups_futures = {
group_id: self._find_coordinator_id_send_request(group_id)
for group_id in group_ids
}
- self._wait_for_futures(list(futures.values()))
- groups_coordinators = [
- (group_id, self._find_coordinator_id_process_response(f.value))
- for group_id, f in futures.items()
- ]
+ self._wait_for_futures(groups_futures.values())
+ groups_coordinators = {
+ group_id: self._find_coordinator_id_process_response(future.value)
+ for group_id, future in groups_futures.items()
+ }
return groups_coordinators
def _send_request_to_node(self, node_id, request):
@@ -1094,18 +1078,19 @@ class KafkaAdminClient(object):
partition assignments.
"""
group_descriptions = []
- futures = []
- for group_id in group_ids:
- if group_coordinator_id is not None:
- this_groups_coordinator_id = group_coordinator_id
- else:
- this_groups_coordinator_id = self._find_coordinator_id(group_id)
- f = self._describe_consumer_groups_send_request(
+
+ if group_coordinator_id is not None:
+ groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids}
+ else:
+ groups_coordinators = self._find_coordinator_ids(group_ids)
+
+ futures = [
+ self._describe_consumer_groups_send_request(
group_id,
- this_groups_coordinator_id,
+ coordinator_id,
include_authorized_operations)
- futures.append(f)
-
+ for group_id, coordinator_id in groups_coordinators.items()
+ ]
self._wait_for_futures(futures)
for future in futures:
@@ -1277,7 +1262,7 @@ class KafkaAdminClient(object):
explicitly specified.
"""
if group_coordinator_id is None:
- group_coordinator_id = self._find_coordinator_id(group_id)
+ group_coordinator_id = self._find_coordinator_ids([group_id])[group_id]
future = self._list_consumer_group_offsets_send_request(
group_id, group_coordinator_id, partitions)
self._wait_for_futures([future])
@@ -1305,12 +1290,12 @@ class KafkaAdminClient(object):
if group_coordinator_id is not None:
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
else:
- groups_coordinators = defaultdict(list)
- for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
- groups_coordinators[group_coordinator_id].append(group_id)
+ coordinators_groups = defaultdict(list)
+ for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items():
+ coordinators_groups[coordinator_id].append(group_id)
futures = [
- self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
- for group_coordinator_id, group_ids in groups_coordinators.items()
+ self._delete_consumer_groups_send_request(group_ids, coordinator_id)
+ for coordinator_id, group_ids in coordinators_groups.items()
]
self._wait_for_futures(futures)