summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-17 02:53:08 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-18 15:29:16 -0800
commitcc8e91426907f8ccadd60eedc4dc53b8729a84ec (patch)
treeddbe457be9b8fc8cce68e0675ab6035c6b59b7d9
parent50690884e74d1cf1075d96bca0c028bc4d8e1e60 (diff)
downloadkafka-python-cc8e91426907f8ccadd60eedc4dc53b8729a84ec.tar.gz
Add list_consumer_group_offsets()
Support fetching the offsets of a consumer group. Note: As far as I can tell (the Java code is a little inscrutable), the Java AdminClient doesn't allow specifying the `coordinator_id` or the `partitions`. But I decided to include them because they provide a lot of additional flexibility: 1. allowing users to specify the partitions allows this method to be used even for older brokers that don't support the OffsetFetchRequest_v2 2. allowing users to specify the coordinator ID gives them a way to bypass a network round trip. This method will frequently be used for monitoring, and if you've got 1,000 consumer groups that are being monitored once a minute, that's ~1.5M requests a day that are unnecessarily duplicated as the coordinator doesn't change unless there's an error.
-rw-r--r--kafka/admin/kafka.py77
-rw-r--r--kafka/structs.py1
2 files changed, 77 insertions, 1 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index befdd86..224a660 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -1,8 +1,12 @@
from __future__ import absolute_import
+from collections import defaultdict
import copy
import logging
import socket
+
+from kafka.vendor import six
+
from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
@@ -12,8 +16,9 @@ from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest)
-from kafka.protocol.commit import GroupCoordinatorRequest
+from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
+from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.version import __version__
log = logging.getLogger(__name__)
@@ -585,5 +590,75 @@ class KafkaAdmin(object):
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)
+ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
+ partitions=None):
+ """Fetch Consumer Group Offsets.
+
+ Note:
+ This does not verify that the group_id or partitions actually exist
+ in the cluster.
+
+ As soon as any error is encountered, it is immediately raised.
+
+ :param group_id: The consumer group id name for which to fetch offsets.
+ :param group_coordinator_id: The node_id of the group's coordinator
+ broker. If set to None, will query the cluster to find the group
+ coordinator. Explicitly specifying this can be useful to prevent
+ that extra network round trip if you already know the group
+ coordinator. Default: None.
+ :param partitions: A list of TopicPartitions for which to fetch
+ offsets. On brokers >= 0.10.2, this can be set to None to fetch all
+ known offsets for the consumer group. Default: None.
+ :return dictionary: A dictionary with TopicPartition keys and
+ OffsetAndMetada values. Partitions that are not specified and for
+ which the group_id does not have a recorded offset are omitted. An
+ offset value of `-1` indicates the group_id has no offset for that
+ TopicPartition. A `-1` can only happen for partitions that are
+ explicitly specified.
+ """
+ group_offsets_listing = {}
+ if group_coordinator_id is None:
+ group_coordinator_id = self._find_group_coordinator_id(group_id)
+ version = self._matching_api_version(OffsetFetchRequest)
+ if version <= 3:
+ if partitions is None:
+ if version <= 1:
+ raise ValueError(
+ """OffsetFetchRequest_v{} requires specifying the
+ partitions for which to fetch offsets. Omitting the
+ partitions is only supported on brokers >= 0.10.2.
+ For details, see KIP-88.""".format(version))
+ topics_partitions = None
+ else:
+ # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])]
+ topics_partitions_dict = defaultdict(set)
+ for topic, partition in partitions:
+ topics_partitions_dict[topic].add(partition)
+ topics_partitions = list(six.iteritems(topics_partitions_dict))
+ request = OffsetFetchRequest[version](group_id, topics_partitions)
+ response = self._send_request_to_node(group_coordinator_id, request)
+ if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ # optionally we could retry if error_type.retriable
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ # transform response into a dictionary with TopicPartition keys and
+ # OffsetAndMetada values--this is what the Java AdminClient returns
+ for topic, partitions in response.topics:
+ for partition, offset, metadata, error_code in partitions:
+ error_type = Errors.for_code(error_code)
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Unable to fetch offsets for group_id {}, topic {}, partition {}"
+ .format(group_id, topic, partition))
+ group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
+ else:
+ raise NotImplementedError(
+ "Support for OffsetFetch v{} has not yet been added to KafkaAdmin."
+ .format(version))
+ return group_offsets_listing
+
# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.
diff --git a/kafka/structs.py b/kafka/structs.py
index e15e92e..baacbcd 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -72,6 +72,7 @@ PartitionMetadata = namedtuple("PartitionMetadata",
["topic", "partition", "leader", "replicas", "isr", "error"])
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
+ # TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
["offset", "metadata"])
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",