summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-07-30 18:35:02 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-07-31 11:26:43 -0700
commitea35fdfe1d66eb481e3406ad161a1255573dd50f (patch)
treec2694603458875f807c7ff6c4eacfe98b65fd410
parenteed25fc36110b12ec370b4d0e332173abce9076f (diff)
downloadkafka-python-ea35fdfe1d66eb481e3406ad161a1255573dd50f.tar.gz
Break FindCoordinator into request/response methods
This splits the `_find_coordinator_id()` method (which is blocking) into request generation / response parsing methods. The public API does not change. However, this allows power users who are willing to deal with risk of private methods changing under their feet to decouple generating the message futures from processing their responses. In other words, you can use these to fire a bunch of requests at once and delay processing the responses until all requests are fired. This is modeled on the work done in #1845. Additionally, I removed the code that tried to leverage the error checking from `cluster.add_group_coordinator()`. That code had changed in #1822, removing most of the error checking... so it no longer adds any value, but instead merely increases complexity and coupling.
-rw-r--r--kafka/admin/client.py80
1 files changed, 48 insertions, 32 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 4fd8a1b..badac32 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -271,7 +271,49 @@ class KafkaAdminClient(object):
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))
- def _find_group_coordinator_id(self, group_id):
+ def _find_coordinator_id_send_request(self, group_id):
+ """Send a FindCoordinatorRequest to a broker.
+
+ :param group_id: The consumer group ID. This is typically the group
+ name as a string.
+ :return: A message future
+ """
+ # TODO add support for dynamically picking version of
+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
+ # When I experimented with this, the coordinator value returned in
+ # GroupCoordinatorResponse_v1 didn't match the value returned by
+ # GroupCoordinatorResponse_v0 and I couldn't figure out why.
+ version = 0
+ # version = self._matching_api_version(GroupCoordinatorRequest)
+ if version <= 0:
+ request = GroupCoordinatorRequest[version](group_id)
+ else:
+ raise NotImplementedError(
+ "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
+
+ def _find_coordinator_id_process_response(self, response):
+ """Process a FindCoordinatorResponse.
+
+ :param response: a FindCoordinatorResponse.
+ :return: The node_id of the broker that is the coordinator.
+ """
+ if response.API_VERSION <= 0:
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ # Note: When error_type.retriable, Java will retry... see
+ # KafkaAdminClient's handleFindCoordinatorError method
+ raise error_type(
+ "FindCoordinatorRequest failed with response '{}'."
+ .format(response))
+ else:
+ raise NotImplementedError(
+ "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return response.coordinator_id
+
+ def _find_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator of the given group.
Sends a FindCoordinatorRequest message to the cluster. Will block until
@@ -283,35 +325,10 @@ class KafkaAdminClient(object):
:return: The node_id of the broker that is the coordinator.
"""
# Note: Java may change how this is implemented in KAFKA-6791.
- #
- # TODO add support for dynamically picking version of
- # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
- # When I experimented with this, GroupCoordinatorResponse_v1 didn't
- # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
- gc_request = GroupCoordinatorRequest[0](group_id)
- future = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
-
+ future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])
-
- gc_response = future.value
- # use the extra error checking in add_group_coordinator() rather than
- # immediately returning the group coordinator.
- success = self._client.cluster.add_group_coordinator(group_id, gc_response)
- if not success:
- error_type = Errors.for_code(gc_response.error_code)
- assert error_type is not Errors.NoError
- # Note: When error_type.retriable, Java will retry... see
- # KafkaAdminClient's handleFindCoordinatorError method
- raise error_type(
- "Could not identify group coordinator for group_id '{}' from response '{}'."
- .format(group_id, gc_response))
- group_coordinator = self._client.cluster.coordinator_for_group(group_id)
- # will be None if the coordinator was never populated, which should never happen here
- assert group_coordinator is not None
- # will be -1 if add_group_coordinator() failed... but by this point the
- # error should have been raised.
- assert group_coordinator != -1
- return group_coordinator
+ response = future.value
+ return self._find_coordinator_id_process_response(response)
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
@@ -329,7 +346,6 @@ class KafkaAdminClient(object):
self._client.poll()
return self._client.send(node_id, request)
-
def _send_request_to_controller(self, request):
"""Send a Kafka protocol message to the cluster controller.
@@ -678,7 +694,7 @@ class KafkaAdminClient(object):
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
- this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
+ this_groups_coordinator_id = self._find_coordinator_id(group_id)
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
futures.append(f)
@@ -853,7 +869,7 @@ class KafkaAdminClient(object):
explicitly specified.
"""
if group_coordinator_id is None:
- group_coordinator_id = self._find_group_coordinator_id(group_id)
+ group_coordinator_id = self._find_coordinator_id(group_id)
future = self._list_consumer_group_offsets_send_request(
group_id, group_coordinator_id, partitions)
self._wait_for_futures([future])