summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSwen Wenzel <5111028+swenzel@users.noreply.github.com>2020-09-17 18:17:35 +0200
committerGitHub <noreply@github.com>2020-09-17 09:17:35 -0700
commit16a0b3155fdeebe80295fcfb0f32d75af74dcb1a (patch)
treedf9b10de0ee8f9b40f28a74276182c096e7bbabf
parente485a6ee2a1f05f2333e22b0fbdbafb12badaf3f (diff)
downloadkafka-python-16a0b3155fdeebe80295fcfb0f32d75af74dcb1a.tar.gz
Feature: delete consumergroups (#2040)
* Add consumergroup related errors * Add DeleteGroups to protocol.admin * Implement delete_groups feature on KafkaAdminClient
-rw-r--r--kafka/admin/client.py93
-rw-r--r--kafka/errors.py12
-rw-r--r--kafka/protocol/admin.py41
-rw-r--r--test/test_admin_integration.py78
4 files changed, 219 insertions, 5 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 1b91e1b..97fe73a 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -19,7 +19,9 @@ from kafka.errors import (
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
- ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
+ ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
+ DeleteGroupsRequest
+)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Array
@@ -337,12 +339,34 @@ class KafkaAdminClient(object):
name as a string.
:return: The node_id of the broker that is the coordinator.
"""
- # Note: Java may change how this is implemented in KAFKA-6791.
future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])
response = future.value
return self._find_coordinator_id_process_response(response)
+ def _find_many_coordinator_ids(self, group_ids):
+ """Find the broker node_id of the coordinator for each of the given groups.
+
+ Sends a FindCoordinatorRequest message to the cluster for each group_id.
+ Will block until the FindCoordinatorResponse is received for all groups.
+ Any errors are immediately raised.
+
+ :param group_ids: A list of consumer group IDs. This is typically the group
+ name as a string.
+ :return: A list of tuples (group_id, node_id) where node_id is the id
+ of the broker that is the coordinator for the corresponding group.
+ """
+ futures = {
+ group_id: self._find_coordinator_id_send_request(group_id)
+ for group_id in group_ids
+ }
+ self._wait_for_futures(list(futures.values()))
+ groups_coordinators = [
+ (group_id, self._find_coordinator_id_process_response(f.value))
+ for group_id, f in futures.items()
+ ]
+ return groups_coordinators
+
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
@@ -1261,8 +1285,69 @@ class KafkaAdminClient(object):
response = future.value
return self._list_consumer_group_offsets_process_response(response)
- # delete groups protocol not yet implemented
- # Note: send the request to the group's coordinator.
+ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
+ """Delete Consumer Group Offsets for given consumer groups.
+
+ Note:
+ This does not verify that the group ids actually exist and
+ group_coordinator_id is the correct coordinator for all these groups.
+
+ The result needs checking for potential errors.
+
+ :param group_ids: The consumer group ids of the groups which are to be deleted.
+ :param group_coordinator_id: The node_id of the broker which is the coordinator for
+ all the groups. Use only if all groups are coordinated by the same broker.
+ If set to None, will query the cluster to find the coordinator for every single group.
+ Explicitly specifying this can be useful to prevent
+ that extra network round trips if you already know the group
+ coordinator. Default: None.
+ :return: A list of tuples (group_id, KafkaError)
+ """
+ if group_coordinator_id is not None:
+ futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
+ else:
+ groups_coordinators = defaultdict(list)
+ for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
+ groups_coordinators[group_coordinator_id].append(group_id)
+ futures = [
+ self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
+ for group_coordinator_id, group_ids in groups_coordinators.items()
+ ]
+
+ self._wait_for_futures(futures)
+
+ results = []
+ for f in futures:
+ results.extend(self._convert_delete_groups_response(f.value))
+ return results
+
+ def _convert_delete_groups_response(self, response):
+ if response.API_VERSION <= 1:
+ results = []
+ for group_id, error_code in response.results:
+ results.append((group_id, Errors.for_code(error_code)))
+ return results
+ else:
+ raise NotImplementedError(
+ "Support for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient."
+ .format(response.API_VERSION))
+
+ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
+ """Send a DeleteGroups request to a broker.
+
+ :param group_ids: The consumer group ids of the groups which are to be deleted.
+ :param group_coordinator_id: The node_id of the broker which is the coordinator for
+ all the groups.
+ :return: A message future
+ """
+ version = self._matching_api_version(DeleteGroupsRequest)
+ if version <= 1:
+ request = DeleteGroupsRequest[version](group_ids)
+ else:
+ raise NotImplementedError(
+ "Support for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient."
+ .format(version))
+ return self._send_request_to_node(group_coordinator_id, request)
def _wait_for_futures(self, futures):
while not all(future.succeeded() for future in futures):
diff --git a/kafka/errors.py b/kafka/errors.py
index 2c1df82..b33cf51 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -449,6 +449,18 @@ class SecurityDisabledError(BrokerResponseError):
description = 'Security features are disabled.'
+class NonEmptyGroupError(BrokerResponseError):
+ errno = 68
+ message = 'NON_EMPTY_GROUP'
+ description = 'The group is not empty.'
+
+
+class GroupIdNotFoundError(BrokerResponseError):
+ errno = 69
+ message = 'GROUP_ID_NOT_FOUND'
+ description = 'The group id does not exist.'
+
+
class KafkaUnavailableError(KafkaError):
pass
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index af88ea4..f3b691a 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -882,3 +882,44 @@ CreatePartitionsResponse = [
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]
+
+class DeleteGroupsResponse_v0(Response):
+ API_KEY = 42
+ API_VERSION = 0
+ SCHEMA = Schema(
+ ("throttle_time_ms", Int32),
+ ("results", Array(
+ ("group_id", String("utf-8")),
+ ("error_code", Int16)))
+ )
+
+
+class DeleteGroupsResponse_v1(Response):
+ API_KEY = 42
+ API_VERSION = 1
+ SCHEMA = DeleteGroupsResponse_v0.SCHEMA
+
+
+class DeleteGroupsRequest_v0(Request):
+ API_KEY = 42
+ API_VERSION = 0
+ RESPONSE_TYPE = DeleteGroupsResponse_v0
+ SCHEMA = Schema(
+ ("groups_names", Array(String("utf-8")))
+ )
+
+
+class DeleteGroupsRequest_v1(Request):
+ API_KEY = 42
+ API_VERSION = 1
+ RESPONSE_TYPE = DeleteGroupsResponse_v1
+ SCHEMA = DeleteGroupsRequest_v0.SCHEMA
+
+
+DeleteGroupsRequest = [
+ DeleteGroupsRequest_v0, DeleteGroupsRequest_v1
+]
+
+DeleteGroupsResponse = [
+ DeleteGroupsResponse_v0, DeleteGroupsResponse_v1
+]
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index dc04537..06c40a2 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -7,7 +7,7 @@ from time import time, sleep
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
-from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)
+from kafka.errors import (NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -142,6 +142,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
@@ -149,6 +150,7 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client):
with pytest.raises(GroupCoordinatorNotAvailableError):
group_description = kafka_admin_client.describe_consumer_groups(['test'])
+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
@@ -236,3 +238,77 @@ def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_facto
stop[c].set()
threads[c].join()
threads[c] = None
+
+
+@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
+def test_delete_consumergroups(kafka_admin_client, kafka_consumer_factory, send_messages):
+ random_group_id = 'test-group-' + random_string(6)
+ group1 = random_group_id + "_1"
+ group2 = random_group_id + "_2"
+ group3 = random_group_id + "_3"
+
+ send_messages(range(0, 100), partition=0)
+ consumer1 = kafka_consumer_factory(group_id=group1)
+ next(consumer1)
+ consumer1.close()
+
+ consumer2 = kafka_consumer_factory(group_id=group2)
+ next(consumer2)
+ consumer2.close()
+
+ consumer3 = kafka_consumer_factory(group_id=group3)
+ next(consumer3)
+ consumer3.close()
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 in consumergroups
+ assert group2 in consumergroups
+ assert group3 in consumergroups
+
+ delete_results = {
+ group_id: error
+ for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2])
+ }
+ assert delete_results[group1] == NoError
+ assert delete_results[group2] == NoError
+ assert group3 not in delete_results
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 not in consumergroups
+ assert group2 not in consumergroups
+ assert group3 in consumergroups
+
+
+@pytest.mark.skipif(env_kafka_version() < (1, 1), reason="Delete consumer groups requires broker >=1.1")
+def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_factory, send_messages):
+ random_group_id = 'test-group-' + random_string(6)
+ group1 = random_group_id + "_1"
+ group2 = random_group_id + "_2"
+ group3 = random_group_id + "_3"
+
+ send_messages(range(0, 100), partition=0)
+ consumer1 = kafka_consumer_factory(group_id=group1)
+ next(consumer1)
+ consumer1.close()
+
+ consumer2 = kafka_consumer_factory(group_id=group2)
+ next(consumer2)
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 in consumergroups
+ assert group2 in consumergroups
+ assert group3 not in consumergroups
+
+ delete_results = {
+ group_id: error
+ for group_id, error in kafka_admin_client.delete_consumer_groups([group1, group2, group3])
+ }
+
+ assert delete_results[group1] == NoError
+ assert delete_results[group2] == NonEmptyGroupError
+ assert delete_results[group3] == GroupIdNotFoundError
+
+ consumergroups = {group_id for group_id, _ in kafka_admin_client.list_consumer_groups()}
+ assert group1 not in consumergroups
+ assert group2 in consumergroups
+ assert group3 not in consumergroups