summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorApurva007 <apurvatelang15@gmail.com>2020-09-16 17:33:45 -0700
committerGitHub <noreply@github.com>2020-09-16 17:33:45 -0700
commit26b8400ecafe9853bbb8ee3caf04a0a53eb6b224 (patch)
treecd994249379d7cf024ef851385cfafed681988cc
parente4913db244ca4d435d279d3047aef3c1c01ebd51 (diff)
downloadkafka-python-26b8400ecafe9853bbb8ee3caf04a0a53eb6b224.tar.gz
Enhancement for Kafka Admin Client's "Describe Consumer Group" (#2035)
Adding namedtuples for DescribeConsumerGroup response; Adding Serialization of MemberData and MemberAssignment for the response Co-authored-by: Apurva Telang <atelang@paypal.com> Co-authored-by: Jeff Widman <jeff@jeffwidman.com>
-rw-r--r--kafka/admin/client.py55
-rw-r--r--kafka/structs.py5
-rw-r--r--test/test_admin_integration.py102
3 files changed, 146 insertions, 16 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index d0fa845..e820587 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -1,6 +1,6 @@
from __future__ import absolute_import
-from collections import defaultdict
+from collections import defaultdict, namedtuple
import copy
import logging
import socket
@@ -8,7 +8,10 @@ import socket
from . import ConfigResourceType
from kafka.vendor import six
+from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
+ ACLResourcePatternType
from kafka.client_async import KafkaClient, selectors
+from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
@@ -19,9 +22,8 @@ from kafka.protocol.admin import (
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
-from kafka.structs import TopicPartition, OffsetAndMetadata
-from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
- ACLResourcePatternType
+from kafka.protocol.types import Array
+from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
from kafka.version import __version__
@@ -1000,22 +1002,47 @@ class KafkaAdminClient(object):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 3:
assert len(response.groups) == 1
- # TODO need to implement converting the response tuple into
- # a more accessible interface like a namedtuple and then stop
- # hardcoding tuple indices here. Several Java examples,
- # including KafkaAdminClient.java
- group_description = response.groups[0]
- error_code = group_description[0]
+ for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
+ if isinstance(response_field, Array):
+ described_groups = response.__dict__[response_name]
+ described_groups_field_schema = response_field.array_of
+ described_group = response.__dict__[response_name][0]
+ described_group_information_list = []
+ protocol_type_is_consumer = False
+ for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
+ if group_information_name == 'protocol_type':
+ protocol_type = described_group_information
+ protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
+ if isinstance(group_information_field, Array):
+ member_information_list = []
+ member_schema = group_information_field.array_of
+ for members in described_group_information:
+ member_information = []
+ for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
+ if protocol_type_is_consumer:
+ if member_name == 'member_metadata' and member:
+ member_information.append(ConsumerProtocolMemberMetadata.decode(member))
+ elif member_name == 'member_assignment' and member:
+ member_information.append(ConsumerProtocolMemberAssignment.decode(member))
+ else:
+ member_information.append(member)
+ member_info_tuple = MemberInformation._make(member_information)
+ member_information_list.append(member_info_tuple)
+ described_group_information_list.append(member_information_list)
+ else:
+ described_group_information_list.append(described_group_information)
+ # Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail
+ # Therefore, appending a placeholder of None in it.
+ if response.API_VERSION <=2:
+ described_group_information_list.append(None)
+ group_description = GroupInformation._make(described_group_information_list)
+ error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
- # TODO Java checks the group protocol type, and if consumer
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
- # the members' partition assignments... that hasn't yet been
- # implemented here so just return the raw struct results
else:
raise NotImplementedError(
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
diff --git a/kafka/structs.py b/kafka/structs.py
index 0d225bc..bcb0236 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -70,6 +70,11 @@ Keyword Arguments:
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
["offset", "timestamp"])
+MemberInformation = namedtuple("MemberInformation",
+ ["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])
+
+GroupInformation = namedtuple("GroupInformation",
+ ["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"])
"""Define retry policy for async producer
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index 37b1405..dc04537 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -1,10 +1,13 @@
import pytest
-from test.testutil import env_kafka_version
+from logging import info
+from test.testutil import env_kafka_version, random_string
+from threading import Event, Thread
+from time import time, sleep
-from kafka.errors import NoError
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
+from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
+def test_describe_consumer_group_does_not_exist(kafka_admin_client):
+ """Tests that the describe consumer group call fails if the group coordinator is not available
+ """
+ with pytest.raises(GroupCoordinatorNotAvailableError):
+ group_description = kafka_admin_client.describe_consumer_groups(['test'])
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
+def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
+ """Tests that the describe consumer group call returns valid consumer group information
+ This test takes inspiration from the test 'test_group' in test_consumer_group.py.
+ """
+ consumers = {}
+ stop = {}
+ threads = {}
+ random_group_id = 'test-group-' + random_string(6)
+ group_id_list = [random_group_id, random_group_id + '_2']
+ generations = {group_id_list[0]: set(), group_id_list[1]: set()}
+ def consumer_thread(i, group_id):
+ assert i not in consumers
+ assert i not in stop
+ stop[i] = Event()
+ consumers[i] = kafka_consumer_factory(group_id=group_id)
+ while not stop[i].is_set():
+ consumers[i].poll(20)
+ consumers[i].close()
+ consumers[i] = None
+ stop[i] = None
+
+ num_consumers = 3
+ for i in range(num_consumers):
+ group_id = group_id_list[i % 2]
+ t = Thread(target=consumer_thread, args=(i, group_id,))
+ t.start()
+ threads[i] = t
+
+ try:
+ timeout = time() + 35
+ while True:
+ for c in range(num_consumers):
+
+ # Verify all consumers have been created
+ if c not in consumers:
+ break
+
+ # Verify all consumers have an assignment
+ elif not consumers[c].assignment():
+ break
+
+ # If all consumers exist and have an assignment
+ else:
+
+ info('All consumers have assignment... checking for stable group')
+ # Verify all consumers are in the same generation
+ # then log state and break while loop
+
+ for consumer in consumers.values():
+ generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id)
+
+ is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()])
+
+ # New generation assignment is not complete until
+ # coordinator.rejoining = False
+ rejoining = any([consumer._coordinator.rejoining
+ for consumer in list(consumers.values())])
+
+ if not rejoining and is_same_generation:
+ break
+ else:
+ sleep(1)
+ assert time() < timeout, "timeout waiting for assignments"
+
+ info('Group stabilized; verifying assignment')
+ output = kafka_admin_client.describe_consumer_groups(group_id_list)
+ assert len(output) == 2
+ consumer_groups = set()
+ for consumer_group in output:
+ assert(consumer_group.group in group_id_list)
+ if consumer_group.group == group_id_list[0]:
+ assert(len(consumer_group.members) == 2)
+ else:
+ assert(len(consumer_group.members) == 1)
+ for member in consumer_group.members:
+ assert(member.member_metadata.subscription[0] == topic)
+ assert(member.member_assignment.assignment[0][0] == topic)
+ consumer_groups.add(consumer_group.group)
+ assert(sorted(list(consumer_groups)) == group_id_list)
+ finally:
+ info('Shutting down %s consumers', num_consumers)
+ for c in range(num_consumers):
+ info('Stopping consumer %s', c)
+ stop[c].set()
+ threads[c].join()
+ threads[c] = None