summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-13 13:10:19 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-18 15:31:55 -0800
commitfec82e3cbdbfa52d556ee76447d372dbffae83ed (patch)
treee96e229974dfef1e093e9d22c2ccda06e8369143
parentcc8e91426907f8ccadd60eedc4dc53b8729a84ec (diff)
downloadkafka-python-fix-list-groups-to-query-all-brokers.tar.gz
Fix list_consumer_groups() to query all brokersfix-list-groups-to-query-all-brokers
Previously, this only queried the controller. In actuality, the Kafka protocol requires that the client query all brokers in order to get the full list of consumer groups. Note: The Java code (as best I can tell) doesn't allow limiting this to specific brokers. And on the surface, this makes sense... you typically don't care about specific brokers. However, the inverse is true... consumer groups care about knowing their group coordinator so they don't have to repeatedly query to find it. In fact, a Kafka broker will only return the groups that it's a coordinator for. While this is an implementation detail that is not guaranteed by the upstream broker code, and technically should not be relied upon, I think it very unlikely to change. So monitoring scripts that fetch the offsets or describe the consumers groups of all groups in the cluster can simply issue one call per broker to identify all the coordinators, rather than having to issue one call per consumer group. For an ad-hoc script this doesn't matter, but for a monitoring script that runs every couple of minutes, this can be a big deal. I know in the situations where I will use this, this matters more to me than the risk of the interface unexpectedly breaking.
-rw-r--r--kafka/admin/kafka.py44
1 files changed, 39 insertions, 5 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index 224a660..05f2873 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -575,20 +575,54 @@ class KafkaAdmin(object):
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)
- def list_consumer_groups(self):
+ def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.
- :return: Appropriate version of ListGroupsResponse class
+ This returns a list of Consumer Group tuples. The tuples are
+ composed of the consumer group name and the consumer group protocol
+ type.
+
+ Only consumer groups that store their offsets in Kafka are returned.
+ The protocol type will be an empty string for groups created using
+ Kafka < 0.9 APIs because, although they store their offsets in Kafka,
+ they don't use Kafka for group coordination. For groups created using
+ Kafka >= 0.9, the protocol type will typically be "consumer".
+
+ As soon as any error is encountered, it is immediately raised.
+
+ :param broker_ids: A list of broker node_ids to query for consumer
+ groups. If set to None, will query all brokers in the cluster.
+ Explicitly specifying broker(s) can be useful for determining which
+ consumer groups are coordinated by those broker(s). Default: None
+ :return list: List of tuples of Consumer Groups.
+ :exception GroupCoordinatorNotAvailableError: The coordinator is not
+ available, so cannot process requests.
+ :exception GroupLoadInProgressError: The coordinator is loading and
+ hence can't process requests.
"""
+ # While we return a list, internally use a set to prevent duplicates
+ # because if a group coordinator fails after being queried, and its
+ # consumer groups move to new brokers that haven't yet been queried,
+ # then the same group could be returned by multiple brokers.
+ consumer_groups = set()
+ if broker_ids is None:
+ broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
version = self._matching_api_version(ListGroupsRequest)
- if version <= 1:
+ if version <= 2:
request = ListGroupsRequest[version]()
+ for broker_id in broker_ids:
+ response = self._send_request_to_node(broker_id, request)
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ consumer_groups.update(response.groups)
else:
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
- # TODO this is completely broken, as it needs to send to the group coordinator
- # return self._send(request)
+ return list(consumer_groups)
def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
partitions=None):