summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyler Lubeck <tyler@tylerlubeck.com>2019-12-29 15:47:32 -0800
committerDana Powers <dana.powers@gmail.com>2019-12-29 15:47:32 -0800
commite06ea70174e0b114bec8072371a54ae6bcd73da5 (patch)
treeee51abdb3b6667c37993ed17d9271cd3d288f282
parente3362aca8c12a07ebe88575b073c91475585f21d (diff)
downloadkafka-python-e06ea70174e0b114bec8072371a54ae6bcd73da5.tar.gz
Admin protocol updates (#1948)
-rw-r--r--kafka/admin/client.py37
-rw-r--r--kafka/protocol/admin.py259
-rw-r--r--test/test_api_object_implementation.py18
3 files changed, 284 insertions, 30 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 8afe95b..accbf14 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -435,7 +435,7 @@ class KafkaAdminClient(object):
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms
)
- elif version <= 2:
+ elif version <= 3:
request = CreateTopicsRequest[version](
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms,
@@ -459,7 +459,7 @@ class KafkaAdminClient(object):
"""
version = self._matching_api_version(DeleteTopicsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
- if version <= 1:
+ if version <= 3:
request = DeleteTopicsRequest[version](
topics=topics,
timeout=timeout_ms
@@ -803,7 +803,7 @@ class KafkaAdminClient(object):
DescribeConfigsRequest[version](resources=topic_resources)
))
- elif version == 1:
+ elif version <= 2:
if len(broker_resources) > 0:
for broker_resource in broker_resources:
try:
@@ -853,7 +853,7 @@ class KafkaAdminClient(object):
:return: Appropriate version of AlterConfigsResponse class.
"""
version = self._matching_api_version(AlterConfigsRequest)
- if version == 0:
+ if version <= 1:
request = AlterConfigsRequest[version](
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
@@ -901,7 +901,7 @@ class KafkaAdminClient(object):
"""
version = self._matching_api_version(CreatePartitionsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
- if version == 0:
+ if version <= 1:
request = CreatePartitionsRequest[version](
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
timeout=timeout_ms,
@@ -928,7 +928,7 @@ class KafkaAdminClient(object):
# describe delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()
- def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id):
+ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id, include_authorized_operations=False):
"""Send a DescribeGroupsRequest to the group's coordinator.
:param group_id: The group name as a string
@@ -937,13 +937,24 @@ class KafkaAdminClient(object):
:return: A message future.
"""
version = self._matching_api_version(DescribeGroupsRequest)
- if version <= 1:
+ if version <= 2:
+ if include_authorized_operations:
+ raise IncompatibleBrokerVersion(
+ "include_authorized_operations requests "
+ "DescribeGroupsRequest >= v3, which is not "
+ "supported by Kafka {}".format(version)
+ )
# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
# all consumer groups. Java still hasn't implemented this
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
+ elif version <= 3:
+ request = DescribeGroupsRequest[version](
+ groups=(group_id,),
+ include_authorized_operations=include_authorized_operations
+ )
else:
raise NotImplementedError(
"Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
@@ -952,7 +963,7 @@ class KafkaAdminClient(object):
def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
- if response.API_VERSION <= 1:
+ if response.API_VERSION <= 3:
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
@@ -976,7 +987,7 @@ class KafkaAdminClient(object):
.format(response.API_VERSION))
return group_description
- def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
+ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
"""Describe a set of consumer groups.
Any errors are immediately raised.
@@ -989,6 +1000,9 @@ class KafkaAdminClient(object):
useful for avoiding extra network round trips if you already know
the group coordinator. This is only useful when all the group_ids
have the same coordinator, otherwise it will error. Default: None.
+ :param include_authorized_operatoins: Whether or not to include
+ information about the operations a group is allowed to perform.
+ Only supported on API version >= v3. Default: False.
:return: A list of group descriptions. For now the group descriptions
are the raw results from the DescribeGroupsResponse. Long-term, we
plan to change this to return namedtuples as well as decoding the
@@ -1001,7 +1015,10 @@ class KafkaAdminClient(object):
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_coordinator_id(group_id)
- f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
+ f = self._describe_consumer_groups_send_request(
+ group_id,
+ this_groups_coordinator_id,
+ include_authorized_operations)
futures.append(f)
self._wait_for_futures(futures)
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index e6efad7..b2694dc 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from kafka.protocol.api import Request, Response
-from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Schema, String
+from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String
class ApiVersionResponse_v0(Response):
@@ -29,6 +29,12 @@ class ApiVersionResponse_v1(Response):
)
+class ApiVersionResponse_v2(Response):
+ API_KEY = 18
+ API_VERSION = 2
+ SCHEMA = ApiVersionResponse_v1.SCHEMA
+
+
class ApiVersionRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
@@ -43,8 +49,19 @@ class ApiVersionRequest_v1(Request):
SCHEMA = ApiVersionRequest_v0.SCHEMA
-ApiVersionRequest = [ApiVersionRequest_v0, ApiVersionRequest_v1]
-ApiVersionResponse = [ApiVersionResponse_v0, ApiVersionResponse_v1]
+class ApiVersionRequest_v2(Request):
+ API_KEY = 18
+ API_VERSION = 2
+ RESPONSE_TYPE = ApiVersionResponse_v1
+ SCHEMA = ApiVersionRequest_v0.SCHEMA
+
+
+ApiVersionRequest = [
+ ApiVersionRequest_v0, ApiVersionRequest_v1, ApiVersionRequest_v2,
+]
+ApiVersionResponse = [
+ ApiVersionResponse_v0, ApiVersionResponse_v1, ApiVersionResponse_v2,
+]
class CreateTopicsResponse_v0(Response):
@@ -79,6 +96,11 @@ class CreateTopicsResponse_v2(Response):
('error_message', String('utf-8'))))
)
+class CreateTopicsResponse_v3(Response):
+ API_KEY = 19
+ API_VERSION = 3
+ SCHEMA = CreateTopicsResponse_v2.SCHEMA
+
class CreateTopicsRequest_v0(Request):
API_KEY = 19
@@ -126,11 +148,20 @@ class CreateTopicsRequest_v2(Request):
SCHEMA = CreateTopicsRequest_v1.SCHEMA
+class CreateTopicsRequest_v3(Request):
+ API_KEY = 19
+ API_VERSION = 3
+ RESPONSE_TYPE = CreateTopicsResponse_v3
+ SCHEMA = CreateTopicsRequest_v1.SCHEMA
+
+
CreateTopicsRequest = [
- CreateTopicsRequest_v0, CreateTopicsRequest_v1, CreateTopicsRequest_v2
+ CreateTopicsRequest_v0, CreateTopicsRequest_v1,
+ CreateTopicsRequest_v2, CreateTopicsRequest_v3,
]
CreateTopicsResponse = [
- CreateTopicsResponse_v0, CreateTopicsResponse_v1, CreateTopicsResponse_v2
+ CreateTopicsResponse_v0, CreateTopicsResponse_v1,
+ CreateTopicsResponse_v2, CreateTopicsResponse_v3,
]
@@ -155,6 +186,18 @@ class DeleteTopicsResponse_v1(Response):
)
+class DeleteTopicsResponse_v2(Response):
+ API_KEY = 20
+ API_VERSION = 2
+ SCHEMA = DeleteTopicsResponse_v1.SCHEMA
+
+
+class DeleteTopicsResponse_v3(Response):
+ API_KEY = 20
+ API_VERSION = 3
+ SCHEMA = DeleteTopicsResponse_v1.SCHEMA
+
+
class DeleteTopicsRequest_v0(Request):
API_KEY = 20
API_VERSION = 0
@@ -172,8 +215,28 @@ class DeleteTopicsRequest_v1(Request):
SCHEMA = DeleteTopicsRequest_v0.SCHEMA
-DeleteTopicsRequest = [DeleteTopicsRequest_v0, DeleteTopicsRequest_v1]
-DeleteTopicsResponse = [DeleteTopicsResponse_v0, DeleteTopicsResponse_v1]
+class DeleteTopicsRequest_v2(Request):
+ API_KEY = 20
+ API_VERSION = 2
+ RESPONSE_TYPE = DeleteTopicsResponse_v2
+ SCHEMA = DeleteTopicsRequest_v0.SCHEMA
+
+
+class DeleteTopicsRequest_v3(Request):
+ API_KEY = 20
+ API_VERSION = 3
+ RESPONSE_TYPE = DeleteTopicsResponse_v3
+ SCHEMA = DeleteTopicsRequest_v0.SCHEMA
+
+
+DeleteTopicsRequest = [
+ DeleteTopicsRequest_v0, DeleteTopicsRequest_v1,
+ DeleteTopicsRequest_v2, DeleteTopicsRequest_v3,
+]
+DeleteTopicsResponse = [
+ DeleteTopicsResponse_v0, DeleteTopicsResponse_v1,
+ DeleteTopicsResponse_v2, DeleteTopicsResponse_v3,
+]
class ListGroupsResponse_v0(Response):
@@ -198,6 +261,11 @@ class ListGroupsResponse_v1(Response):
('protocol_type', String('utf-8'))))
)
+class ListGroupsResponse_v2(Response):
+ API_KEY = 16
+ API_VERSION = 2
+ SCHEMA = ListGroupsResponse_v1.SCHEMA
+
class ListGroupsRequest_v0(Request):
API_KEY = 16
@@ -212,9 +280,21 @@ class ListGroupsRequest_v1(Request):
RESPONSE_TYPE = ListGroupsResponse_v1
SCHEMA = ListGroupsRequest_v0.SCHEMA
+class ListGroupsRequest_v2(Request):
+ API_KEY = 16
+ API_VERSION = 1
+ RESPONSE_TYPE = ListGroupsResponse_v2
+ SCHEMA = ListGroupsRequest_v0.SCHEMA
-ListGroupsRequest = [ListGroupsRequest_v0, ListGroupsRequest_v1]
-ListGroupsResponse = [ListGroupsResponse_v0, ListGroupsResponse_v1]
+
+ListGroupsRequest = [
+ ListGroupsRequest_v0, ListGroupsRequest_v1,
+ ListGroupsRequest_v2,
+]
+ListGroupsResponse = [
+ ListGroupsResponse_v0, ListGroupsResponse_v1,
+ ListGroupsResponse_v2,
+]
class DescribeGroupsResponse_v0(Response):
@@ -256,6 +336,33 @@ class DescribeGroupsResponse_v1(Response):
)
+class DescribeGroupsResponse_v2(Response):
+ API_KEY = 15
+ API_VERSION = 2
+ SCHEMA = DescribeGroupsResponse_v1.SCHEMA
+
+
+class DescribeGroupsResponse_v3(Response):
+ API_KEY = 15
+ API_VERSION = 3
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('groups', Array(
+ ('error_code', Int16),
+ ('group', String('utf-8')),
+ ('state', String('utf-8')),
+ ('protocol_type', String('utf-8')),
+ ('protocol', String('utf-8')),
+ ('members', Array(
+ ('member_id', String('utf-8')),
+ ('client_id', String('utf-8')),
+ ('client_host', String('utf-8')),
+ ('member_metadata', Bytes),
+ ('member_assignment', Bytes)))),
+ ('authorized_operations', Int32))
+ )
+
+
class DescribeGroupsRequest_v0(Request):
API_KEY = 15
API_VERSION = 0
@@ -272,8 +379,31 @@ class DescribeGroupsRequest_v1(Request):
SCHEMA = DescribeGroupsRequest_v0.SCHEMA
-DescribeGroupsRequest = [DescribeGroupsRequest_v0, DescribeGroupsRequest_v1]
-DescribeGroupsResponse = [DescribeGroupsResponse_v0, DescribeGroupsResponse_v1]
+class DescribeGroupsRequest_v2(Request):
+ API_KEY = 15
+ API_VERSION = 2
+ RESPONSE_TYPE = DescribeGroupsResponse_v2
+ SCHEMA = DescribeGroupsRequest_v0.SCHEMA
+
+
+class DescribeGroupsRequest_v3(Request):
+ API_KEY = 15
+ API_VERSION = 3
+ RESPONSE_TYPE = DescribeGroupsResponse_v2
+ SCHEMA = Schema(
+ ('groups', Array(String('utf-8'))),
+ ('include_authorized_operations', Boolean)
+ )
+
+
+DescribeGroupsRequest = [
+ DescribeGroupsRequest_v0, DescribeGroupsRequest_v1,
+ DescribeGroupsRequest_v2, DescribeGroupsRequest_v3,
+]
+DescribeGroupsResponse = [
+ DescribeGroupsResponse_v0, DescribeGroupsResponse_v1,
+ DescribeGroupsResponse_v2, DescribeGroupsResponse_v3,
+]
class SaslHandShakeResponse_v0(Response):
@@ -507,6 +637,13 @@ class AlterConfigsResponse_v0(Response):
('resource_name', String('utf-8'))))
)
+
+class AlterConfigsResponse_v1(Response):
+ API_KEY = 33
+ API_VERSION = 1
+ SCHEMA = AlterConfigsResponse_v0.SCHEMA
+
+
class AlterConfigsRequest_v0(Request):
API_KEY = 33
API_VERSION = 0
@@ -521,8 +658,14 @@ class AlterConfigsRequest_v0(Request):
('validate_only', Boolean)
)
-AlterConfigsRequest = [AlterConfigsRequest_v0]
-AlterConfigsResponse = [AlterConfigsResponse_v0]
+class AlterConfigsRequest_v1(Request):
+ API_KEY = 33
+ API_VERSION = 1
+ RESPONSE_TYPE = AlterConfigsResponse_v1
+ SCHEMA = AlterConfigsRequest_v0.SCHEMA
+
+AlterConfigsRequest = [AlterConfigsRequest_v0, AlterConfigsRequest_v1]
+AlterConfigsResponse = [AlterConfigsResponse_v0, AlterConfigsRequest_v1]
class DescribeConfigsResponse_v0(Response):
@@ -565,6 +708,28 @@ class DescribeConfigsResponse_v1(Response):
('config_source', Int8)))))))
)
+class DescribeConfigsResponse_v2(Response):
+ API_KEY = 32
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('resources', Array(
+ ('error_code', Int16),
+ ('error_message', String('utf-8')),
+ ('resource_type', Int8),
+ ('resource_name', String('utf-8')),
+ ('config_entries', Array(
+ ('config_names', String('utf-8')),
+ ('config_value', String('utf-8')),
+ ('read_only', Boolean),
+ ('config_source', Int8),
+ ('is_sensitive', Boolean),
+ ('config_synonyms', Array(
+ ('config_name', String('utf-8')),
+ ('config_value', String('utf-8')),
+ ('config_source', Int8)))))))
+ )
+
class DescribeConfigsRequest_v0(Request):
API_KEY = 32
API_VERSION = 0
@@ -588,10 +753,25 @@ class DescribeConfigsRequest_v1(Request):
('include_synonyms', Boolean)
)
-DescribeConfigsRequest = [DescribeConfigsRequest_v0, DescribeConfigsRequest_v1]
-DescribeConfigsResponse = [DescribeConfigsResponse_v0, DescribeConfigsResponse_v1]
-class SaslAuthenticateResponse_v0(Request):
+class DescribeConfigsRequest_v2(Request):
+ API_KEY = 32
+ API_VERSION = 2
+ RESPONSE_TYPE = DescribeConfigsResponse_v2
+ SCHEMA = DescribeConfigsRequest_v1.SCHEMA
+
+
+DescribeConfigsRequest = [
+ DescribeConfigsRequest_v0, DescribeConfigsRequest_v1,
+ DescribeConfigsRequest_v2,
+]
+DescribeConfigsResponse = [
+ DescribeConfigsResponse_v0, DescribeConfigsResponse_v1,
+ DescribeConfigsResponse_v2,
+]
+
+
+class SaslAuthenticateResponse_v0(Response):
API_KEY = 36
API_VERSION = 0
SCHEMA = Schema(
@@ -601,6 +781,17 @@ class SaslAuthenticateResponse_v0(Request):
)
+class SaslAuthenticateResponse_v1(Response):
+ API_KEY = 36
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('error_message', String('utf-8')),
+ ('sasl_auth_bytes', Bytes),
+ ('session_lifetime_ms', Int64)
+ )
+
+
class SaslAuthenticateRequest_v0(Request):
API_KEY = 36
API_VERSION = 0
@@ -610,8 +801,19 @@ class SaslAuthenticateRequest_v0(Request):
)
-SaslAuthenticateRequest = [SaslAuthenticateRequest_v0]
-SaslAuthenticateResponse = [SaslAuthenticateResponse_v0]
+class SaslAuthenticateRequest_v1(Request):
+ API_KEY = 36
+ API_VERSION = 1
+ RESPONSE_TYPE = SaslAuthenticateResponse_v1
+ SCHEMA = SaslAuthenticateRequest_v0.SCHEMA
+
+
+SaslAuthenticateRequest = [
+ SaslAuthenticateRequest_v0, SaslAuthenticateRequest_v1,
+]
+SaslAuthenticateResponse = [
+ SaslAuthenticateResponse_v0, SaslAuthenticateResponse_v1,
+]
class CreatePartitionsResponse_v0(Response):
@@ -626,6 +828,12 @@ class CreatePartitionsResponse_v0(Response):
)
+class CreatePartitionsResponse_v1(Response):
+ API_KEY = 37
+ API_VERSION = 1
+ SCHEMA = CreatePartitionsResponse_v0.SCHEMA
+
+
class CreatePartitionsRequest_v0(Request):
API_KEY = 37
API_VERSION = 0
@@ -641,5 +849,16 @@ class CreatePartitionsRequest_v0(Request):
)
-CreatePartitionsRequest = [CreatePartitionsRequest_v0]
-CreatePartitionsResponse = [CreatePartitionsResponse_v0]
+class CreatePartitionsRequest_v1(Request):
+ API_KEY = 37
+ API_VERSION = 1
+ SCHEMA = CreatePartitionsRequest_v0.SCHEMA
+ RESPONSE_TYPE = CreatePartitionsResponse_v1
+
+
+CreatePartitionsRequest = [
+ CreatePartitionsRequest_v0, CreatePartitionsRequest_v1,
+]
+CreatePartitionsResponse = [
+ CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
+]
diff --git a/test/test_api_object_implementation.py b/test/test_api_object_implementation.py
new file mode 100644
index 0000000..da80f14
--- /dev/null
+++ b/test/test_api_object_implementation.py
@@ -0,0 +1,18 @@
+import abc
+import pytest
+
+from kafka.protocol.api import Request
+from kafka.protocol.api import Response
+
+
+attr_names = [n for n in dir(Request) if isinstance(getattr(Request, n), abc.abstractproperty)]
+@pytest.mark.parametrize('klass', Request.__subclasses__())
+@pytest.mark.parametrize('attr_name', attr_names)
+def test_request_type_conformance(klass, attr_name):
+ assert hasattr(klass, attr_name)
+
+attr_names = [n for n in dir(Response) if isinstance(getattr(Response, n), abc.abstractproperty)]
+@pytest.mark.parametrize('klass', Response.__subclasses__())
+@pytest.mark.parametrize('attr_name', attr_names)
+def test_response_type_conformance(klass, attr_name):
+ assert hasattr(klass, attr_name)