summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-06-19 18:17:25 -0700
committerGitHub <noreply@github.com>2019-06-19 18:17:25 -0700
commit01053daac9fa18d5497a42fb58a5a3aa8add116f (patch)
treeeb73c05f97c3c712a0cfaef2d3d3c14cfa326547
parent91f4642e92afc208531f66cea1ed7ef32bcfa4d1 (diff)
downloadkafka-python-01053daac9fa18d5497a42fb58a5a3aa8add116f.tar.gz
Break consumer operations into request / response methods (#1845)
This breaks some of the consumer operations into request generation / response parsing methods. The public API does not change. However, this allows power users who are willing to deal with risk of private methods changing under their feet to decouple generating the message futures from processing their responses. In other words, you can use these to fire a bunch of request at once and delay processing the responses until all requests are fired.
-rw-r--r--kafka/admin/client.py249
1 files changed, 155 insertions, 94 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 155ad21..5082f4d 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -349,7 +349,7 @@ class KafkaAdminClient(object):
# one of these attributes and that they always unpack into
# (topic, error_code) tuples.
topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
- else response.topic_error_codes)
+ else response.topic_error_codes)
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
@@ -501,8 +501,8 @@ class KafkaAdminClient(object):
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
-
- return future.value
+ response = future.value
+ return response
@staticmethod
def _convert_alter_config_resource_request(config_resource):
@@ -544,8 +544,8 @@ class KafkaAdminClient(object):
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
-
- return future.value
+ response = future.value
+ return response
# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker
@@ -602,6 +602,54 @@ class KafkaAdminClient(object):
# describe delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()
+ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id):
+ """Send a DescribeGroupsRequest to the group's coordinator.
+
+ :param group_id: The group name as a string
+ :param group_coordinator_id: The node_id of the groups' coordinator
+ broker.
+ :return: A message future.
+ """
+ version = self._matching_api_version(DescribeGroupsRequest)
+ if version <= 1:
+ # Note: KAFKA-6788 A potential optimization is to group the
+ # request per coordinator and send one request with a list of
+ # all consumer groups. Java still hasn't implemented this
+ # because the error checking is hard to get right when some
+ # groups error and others don't.
+ request = DescribeGroupsRequest[version](groups=(group_id,))
+ else:
+ raise NotImplementedError(
+ "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(group_coordinator_id, request)
+
+ def _describe_consumer_groups_process_response(self, response):
+ """Process a DescribeGroupsResponse into a group description."""
+ if response.API_VERSION <= 1:
+ assert len(response.groups) == 1
+ # TODO need to implement converting the response tuple into
+ # a more accessible interface like a namedtuple and then stop
+ # hardcoding tuple indices here. Several Java examples,
+ # including KafkaAdminClient.java
+ group_description = response.groups[0]
+ error_code = group_description[0]
+ error_type = Errors.for_code(error_code)
+ # Java has the note: KAFKA-6789, we can retry based on the error code
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "DescribeGroupsResponse failed with response '{}'."
+ .format(response))
+ # TODO Java checks the group protocol type, and if consumer
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
+ # the members' partition assignments... that hasn't yet been
+ # implemented here so just return the raw struct results
+ else:
+ raise NotImplementedError(
+ "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return group_description
+
def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
"""Describe a set of consumer groups.
@@ -622,51 +670,52 @@ class KafkaAdminClient(object):
"""
group_descriptions = []
futures = []
- version = self._matching_api_version(DescribeGroupsRequest)
for group_id in group_ids:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
-
- if version <= 1:
- # Note: KAFKA-6788 A potential optimization is to group the
- # request per coordinator and send one request with a list of
- # all consumer groups. Java still hasn't implemented this
- # because the error checking is hard to get right when some
- # groups error and others don't.
- request = DescribeGroupsRequest[version](groups=(group_id,))
- futures.append(self._send_request_to_node(this_groups_coordinator_id, request))
- else:
- raise NotImplementedError(
- "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
- .format(version))
+ f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
+ futures.append(f)
self._wait_for_futures(futures)
for future in futures:
response = future.value
- assert len(response.groups) == 1
- # TODO need to implement converting the response tuple into
- # a more accessible interface like a namedtuple and then stop
- # hardcoding tuple indices here. Several Java examples,
- # including KafkaAdminClient.java
- group_description = response.groups[0]
- error_code = group_description[0]
- error_type = Errors.for_code(error_code)
- # Java has the note: KAFKA-6789, we can retry based on the error code
- if error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- # TODO Java checks the group protocol type, and if consumer
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
- # the members' partition assignments... that hasn't yet been
- # implemented here so just return the raw struct results
+ group_description = self._describe_consumer_groups_process_response(response)
group_descriptions.append(group_description)
return group_descriptions
+ def _list_consumer_groups_send_request(self, broker_id):
+ """Send a ListGroupsRequest to a broker.
+
+ :param broker_id: The broker's node_id.
+ :return: A message future
+ """
+ version = self._matching_api_version(ListGroupsRequest)
+ if version <= 2:
+ request = ListGroupsRequest[version]()
+ else:
+ raise NotImplementedError(
+ "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(broker_id, request)
+
+ def _list_consumer_groups_process_response(self, response):
+ """Process a ListGroupsResponse into a list of groups."""
+ if response.API_VERSION <= 2:
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "ListGroupsRequest failed with response '{}'."
+ .format(response))
+ else:
+ raise NotImplementedError(
+ "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return response.groups
+
def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.
@@ -697,60 +746,24 @@ class KafkaAdminClient(object):
# consumer groups move to new brokers that haven't yet been queried,
# then the same group could be returned by multiple brokers.
consumer_groups = set()
- futures = []
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
- version = self._matching_api_version(ListGroupsRequest)
- if version <= 2:
- request = ListGroupsRequest[version]()
- for broker_id in broker_ids:
- futures.append(self._send_request_to_node(broker_id, request))
-
- self._wait_for_futures(futures)
-
- for future in futures:
- response = future.value
- error_type = Errors.for_code(response.error_code)
- if error_type is not Errors.NoError:
- raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
- consumer_groups.update(response.groups)
- else:
- raise NotImplementedError(
- "Support for ListGroups v{} has not yet been added to KafkaAdminClient."
- .format(version))
+ futures = [self._list_consumer_groups_send_request(b) for b in broker_ids]
+ self._wait_for_futures(futures)
+ for f in futures:
+ response = f.value
+ consumer_groups.update(self._list_consumer_groups_process_response(response))
return list(consumer_groups)
- def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
- partitions=None):
- """Fetch Consumer Group Offsets.
-
- Note:
- This does not verify that the group_id or partitions actually exist
- in the cluster.
-
- As soon as any error is encountered, it is immediately raised.
+ def _list_consumer_group_offsets_send_request(self, group_id,
+ group_coordinator_id, partitions=None):
+ """Send an OffsetFetchRequest to a broker.
:param group_id: The consumer group id name for which to fetch offsets.
:param group_coordinator_id: The node_id of the group's coordinator
- broker. If set to None, will query the cluster to find the group
- coordinator. Explicitly specifying this can be useful to prevent
- that extra network round trip if you already know the group
- coordinator. Default: None.
- :param partitions: A list of TopicPartitions for which to fetch
- offsets. On brokers >= 0.10.2, this can be set to None to fetch all
- known offsets for the consumer group. Default: None.
- :return dictionary: A dictionary with TopicPartition keys and
- OffsetAndMetada values. Partitions that are not specified and for
- which the group_id does not have a recorded offset are omitted. An
- offset value of `-1` indicates the group_id has no offset for that
- TopicPartition. A `-1` can only happen for partitions that are
- explicitly specified.
+ broker.
+ :return: A message future
"""
- group_offsets_listing = {}
- if group_coordinator_id is None:
- group_coordinator_id = self._find_group_coordinator_id(group_id)
version = self._matching_api_version(OffsetFetchRequest)
if version <= 3:
if partitions is None:
@@ -768,32 +781,80 @@ class KafkaAdminClient(object):
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
- future = self._send_request_to_node(group_coordinator_id, request)
- self._wait_for_futures([future])
- response = future.value
+ else:
+ raise NotImplementedError(
+ "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(group_coordinator_id, request)
+
+ def _list_consumer_group_offsets_process_response(self, response):
+ """Process an OffsetFetchResponse.
- if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
+ :param response: an OffsetFetchResponse.
+ :return: A dictionary composed of TopicPartition keys and
+ OffsetAndMetada values.
+ """
+ if response.API_VERSION <= 3:
+
+ # OffsetFetchResponse_v1 lacks a top-level error_code
+ if response.API_VERSION > 1:
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# optionally we could retry if error_type.retriable
raise error_type(
- "Request '{}' failed with response '{}'."
- .format(request, response))
+ "OffsetFetchResponse failed with response '{}'."
+ .format(response))
+
# transform response into a dictionary with TopicPartition keys and
# OffsetAndMetada values--this is what the Java AdminClient returns
+ offsets = {}
for topic, partitions in response.topics:
for partition, offset, metadata, error_code in partitions:
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
raise error_type(
- "Unable to fetch offsets for group_id {}, topic {}, partition {}"
- .format(group_id, topic, partition))
- group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
+ "Unable to fetch consumer group offsets for topic {}, partition {}"
+ .format(topic, partition))
+ offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
else:
raise NotImplementedError(
- "Support for OffsetFetch v{} has not yet been added to KafkaAdminClient."
- .format(version))
- return group_offsets_listing
+ "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+ return offsets
+
+ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
+ partitions=None):
+ """Fetch Consumer Offsets for a single consumer group.
+
+ Note:
+ This does not verify that the group_id or partitions actually exist
+ in the cluster.
+
+ As soon as any error is encountered, it is immediately raised.
+
+ :param group_id: The consumer group id name for which to fetch offsets.
+ :param group_coordinator_id: The node_id of the group's coordinator
+ broker. If set to None, will query the cluster to find the group
+ coordinator. Explicitly specifying this can be useful to prevent
+ that extra network round trip if you already know the group
+ coordinator. Default: None.
+ :param partitions: A list of TopicPartitions for which to fetch
+ offsets. On brokers >= 0.10.2, this can be set to None to fetch all
+ known offsets for the consumer group. Default: None.
+ :return dictionary: A dictionary with TopicPartition keys and
+ OffsetAndMetada values. Partitions that are not specified and for
+ which the group_id does not have a recorded offset are omitted. An
+ offset value of `-1` indicates the group_id has no offset for that
+ TopicPartition. A `-1` can only happen for partitions that are
+ explicitly specified.
+ """
+ if group_coordinator_id is None:
+ group_coordinator_id = self._find_group_coordinator_id(group_id)
+ future = self._list_consumer_group_offsets_send_request(
+ group_id, group_coordinator_id, partitions)
+ self._wait_for_futures([future])
+ response = future.value
+ return self._list_consumer_group_offsets_process_response(response)
# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.