diff options
author | Apurva007 <apurvatelang15@gmail.com> | 2020-09-16 17:33:45 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-16 17:33:45 -0700 |
commit | 26b8400ecafe9853bbb8ee3caf04a0a53eb6b224 (patch) | |
tree | cd994249379d7cf024ef851385cfafed681988cc | |
parent | e4913db244ca4d435d279d3047aef3c1c01ebd51 (diff) | |
download | kafka-python-26b8400ecafe9853bbb8ee3caf04a0a53eb6b224.tar.gz |
Enhancement for Kafka Admin Client's "Describe Consumer Group" (#2035)
Adding namedtuples for DescribeConsumerGroup response; Adding Serialization of MemberData and MemberAssignment for the response
Co-authored-by: Apurva Telang <atelang@paypal.com>
Co-authored-by: Jeff Widman <jeff@jeffwidman.com>
-rw-r--r-- | kafka/admin/client.py | 55 | ||||
-rw-r--r-- | kafka/structs.py | 5 | ||||
-rw-r--r-- | test/test_admin_integration.py | 102 |
3 files changed, 146 insertions, 16 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index d0fa845..e820587 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -from collections import defaultdict +from collections import defaultdict, namedtuple import copy import logging import socket @@ -8,7 +8,10 @@ import socket from . import ConfigResourceType from kafka.vendor import six +from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ + ACLResourcePatternType from kafka.client_async import KafkaClient, selectors +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, @@ -19,9 +22,8 @@ from kafka.protocol.admin import ( ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest -from kafka.structs import TopicPartition, OffsetAndMetadata -from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ - ACLResourcePatternType +from kafka.protocol.types import Array +from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation from kafka.version import __version__ @@ -1000,22 +1002,47 @@ class KafkaAdminClient(object): """Process a DescribeGroupsResponse into a group description.""" if response.API_VERSION <= 3: assert len(response.groups) == 1 - # TODO need to implement converting the response tuple into - # a more accessible interface like a namedtuple and then stop - # hardcoding tuple indices here. Several Java examples, - # including KafkaAdminClient.java - group_description = response.groups[0] - error_code = group_description[0] + for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): + if isinstance(response_field, Array): + described_groups = response.__dict__[response_name] + described_groups_field_schema = response_field.array_of + described_group = response.__dict__[response_name][0] + described_group_information_list = [] + protocol_type_is_consumer = False + for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): + if group_information_name == 'protocol_type': + protocol_type = described_group_information + protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type) + if isinstance(group_information_field, Array): + member_information_list = [] + member_schema = group_information_field.array_of + for members in described_group_information: + member_information = [] + for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names): + if protocol_type_is_consumer: + if member_name == 'member_metadata' and member: + member_information.append(ConsumerProtocolMemberMetadata.decode(member)) + elif member_name == 'member_assignment' and member: + member_information.append(ConsumerProtocolMemberAssignment.decode(member)) + else: + member_information.append(member) + member_info_tuple = MemberInformation._make(member_information) + member_information_list.append(member_info_tuple) + described_group_information_list.append(member_information_list) + else: + described_group_information_list.append(described_group_information) + # Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail + # Therefore, appending a placeholder of None in it. + if response.API_VERSION <=2: + described_group_information_list.append(None) + group_description = GroupInformation._make(described_group_information_list) + error_code = group_description.error_code error_type = Errors.for_code(error_code) # Java has the note: KAFKA-6789, we can retry based on the error code if error_type is not Errors.NoError: raise error_type( "DescribeGroupsResponse failed with response '{}'." .format(response)) - # TODO Java checks the group protocol type, and if consumer - # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes - # the members' partition assignments... that hasn't yet been - # implemented here so just return the raw struct results else: raise NotImplementedError( "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." diff --git a/kafka/structs.py b/kafka/structs.py index 0d225bc..bcb0236 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -70,6 +70,11 @@ Keyword Arguments: OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp"]) +MemberInformation = namedtuple("MemberInformation", + ["member_id", "client_id", "client_host", "member_metadata", "member_assignment"]) + +GroupInformation = namedtuple("GroupInformation", + ["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"]) """Define retry policy for async producer diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 37b1405..dc04537 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,10 +1,13 @@ import pytest -from test.testutil import env_kafka_version +from logging import info +from test.testutil import env_kafka_version, random_string +from threading import Event, Thread +from time import time, sleep -from kafka.errors import NoError from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) +from kafka.errors import (NoError, GroupCoordinatorNotAvailableError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): with pytest.raises(ValueError): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') +def test_describe_consumer_group_does_not_exist(kafka_admin_client): + """Tests that the describe consumer group call fails if the group coordinator is not available + """ + with pytest.raises(GroupCoordinatorNotAvailableError): + group_description = kafka_admin_client.describe_consumer_groups(['test']) + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') +def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): + """Tests that the describe consumer group call returns valid consumer group information + This test takes inspiration from the test 'test_group' in test_consumer_group.py. + """ + consumers = {} + stop = {} + threads = {} + random_group_id = 'test-group-' + random_string(6) + group_id_list = [random_group_id, random_group_id + '_2'] + generations = {group_id_list[0]: set(), group_id_list[1]: set()} + def consumer_thread(i, group_id): + assert i not in consumers + assert i not in stop + stop[i] = Event() + consumers[i] = kafka_consumer_factory(group_id=group_id) + while not stop[i].is_set(): + consumers[i].poll(20) + consumers[i].close() + consumers[i] = None + stop[i] = None + + num_consumers = 3 + for i in range(num_consumers): + group_id = group_id_list[i % 2] + t = Thread(target=consumer_thread, args=(i, group_id,)) + t.start() + threads[i] = t + + try: + timeout = time() + 35 + while True: + for c in range(num_consumers): + + # Verify all consumers have been created + if c not in consumers: + break + + # Verify all consumers have an assignment + elif not consumers[c].assignment(): + break + + # If all consumers exist and have an assignment + else: + + info('All consumers have assignment... checking for stable group') + # Verify all consumers are in the same generation + # then log state and break while loop + + for consumer in consumers.values(): + generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id) + + is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()]) + + # New generation assignment is not complete until + # coordinator.rejoining = False + rejoining = any([consumer._coordinator.rejoining + for consumer in list(consumers.values())]) + + if not rejoining and is_same_generation: + break + else: + sleep(1) + assert time() < timeout, "timeout waiting for assignments" + + info('Group stabilized; verifying assignment') + output = kafka_admin_client.describe_consumer_groups(group_id_list) + assert len(output) == 2 + consumer_groups = set() + for consumer_group in output: + assert(consumer_group.group in group_id_list) + if consumer_group.group == group_id_list[0]: + assert(len(consumer_group.members) == 2) + else: + assert(len(consumer_group.members) == 1) + for member in consumer_group.members: + assert(member.member_metadata.subscription[0] == topic) + assert(member.member_assignment.assignment[0][0] == topic) + consumer_groups.add(consumer_group.group) + assert(sorted(list(consumer_groups)) == group_id_list) + finally: + info('Shutting down %s consumers', num_consumers) + for c in range(num_consumers): + info('Stopping consumer %s', c) + stop[c].set() + threads[c].join() + threads[c] = None |