From 76ad6629350f20acfd6038c1e444a89bcd255f89 Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Sat, 28 Sep 2019 23:57:05 +0200 Subject: Add ACL api to KafkaAdminClient (#1833) --- kafka/admin/__init__.py | 6 +- kafka/admin/acl_resource.py | 212 +++++++++++++++++++++ kafka/admin/client.py | 273 +++++++++++++++++++++++++++- kafka/errors.py | 6 + servers/0.10.0.0/resources/kafka.properties | 3 + servers/0.10.0.1/resources/kafka.properties | 3 + servers/0.10.1.1/resources/kafka.properties | 3 + servers/0.10.2.1/resources/kafka.properties | 3 + servers/0.11.0.0/resources/kafka.properties | 3 + servers/0.11.0.1/resources/kafka.properties | 3 + servers/0.11.0.2/resources/kafka.properties | 3 + servers/0.9.0.0/resources/kafka.properties | 3 + servers/0.9.0.1/resources/kafka.properties | 3 + servers/1.0.0/resources/kafka.properties | 3 + servers/1.0.1/resources/kafka.properties | 3 + servers/1.0.2/resources/kafka.properties | 3 + servers/1.1.0/resources/kafka.properties | 3 + servers/1.1.1/resources/kafka.properties | 3 + servers/2.0.0/resources/kafka.properties | 3 + servers/2.0.1/resources/kafka.properties | 3 + test/test_admin.py | 31 ++++ test/test_admin_integration.py | 107 +++++++++++ 22 files changed, 674 insertions(+), 9 deletions(-) create mode 100644 kafka/admin/acl_resource.py create mode 100644 test/test_admin_integration.py diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index a300301..c240fc6 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -2,9 +2,13 @@ from __future__ import absolute_import from kafka.admin.config_resource import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient +from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation, + ResourceType, ACLPermissionType, ACLResourcePatternType) from kafka.admin.new_topic import NewTopic from kafka.admin.new_partitions import NewPartitions __all__ = [ - 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions' + 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'ACL', 'ACLFilter', + 'ResourcePattern', 'ResourcePatternFilter', 'ACLOperation', 'ResourceType', 'ACLPermissionType', + 'ACLResourcePatternType' ] diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py new file mode 100644 index 0000000..7a012d2 --- /dev/null +++ b/kafka/admin/acl_resource.py @@ -0,0 +1,212 @@ +from __future__ import absolute_import +from kafka.errors import IllegalArgumentError + +# enum in stdlib as of py3.4 +try: + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + + +class ResourceType(IntEnum): + """Type of kafka resource to set ACL for + + The ANY value is only valid in a filter context + """ + + UNKNOWN = 0, + ANY = 1, + CLUSTER = 4, + DELEGATION_TOKEN = 6, + GROUP = 3, + TOPIC = 2, + TRANSACTIONAL_ID = 5 + + +class ACLOperation(IntEnum): + """Type of operation + + The ANY value is only valid in a filter context + """ + + ANY = 1, + ALL = 2, + READ = 3, + WRITE = 4, + CREATE = 5, + DELETE = 6, + ALTER = 7, + DESCRIBE = 8, + CLUSTER_ACTION = 9, + DESCRIBE_CONFIGS = 10, + ALTER_CONFIGS = 11, + IDEMPOTENT_WRITE = 12 + + +class ACLPermissionType(IntEnum): + """An enumerated type of permissions + + The ANY value is only valid in a filter context + """ + + ANY = 1, + DENY = 2, + ALLOW = 3 + + +class ACLResourcePatternType(IntEnum): + """An enumerated type of resource patterns + + More details on the pattern types and how they work + can be found in KIP-290 (Support for prefixed ACLs) + https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs + """ + + ANY = 1, + MATCH = 2, + LITERAL = 3, + PREFIXED = 4 + + +class ACLFilter(object): + """Represents a filter to use with describing and deleting ACLs + + The difference between this class and the ACL class is mainly that + we allow using ANY with the operation, permission, and resource type objects + to fetch ALCs matching any of the properties. + + To make a filter matching any principal, set principal to None + """ + + def __init__( + self, + principal, + host, + operation, + permission_type, + resource_pattern + ): + self.principal = principal + self.host = host + self.operation = operation + self.permission_type = permission_type + self.resource_pattern = resource_pattern + + self.validate() + + def validate(self): + if not isinstance(self.operation, ACLOperation): + raise IllegalArgumentError("operation must be an ACLOperation object, and cannot be ANY") + if not isinstance(self.permission_type, ACLPermissionType): + raise IllegalArgumentError("permission_type must be an ACLPermissionType object, and cannot be ANY") + if not isinstance(self.resource_pattern, ResourcePatternFilter): + raise IllegalArgumentError("resource_pattern must be a ResourcePatternFilter object") + + def __repr__(self): + return "".format( + principal=self.principal, + host=self.host, + operation=self.operation.name, + type=self.permission_type.name, + resource=self.resource_pattern + ) + + +class ACL(ACLFilter): + """Represents a concrete ACL for a specific ResourcePattern + + In kafka an ACL is a 4-tuple of (principal, host, operation, permission_type) + that limits who can do what on a specific resource (or since KIP-290 a resource pattern) + + Terminology: + Principal -> This is the identifier for the user. Depending on the authorization method used (SSL, SASL etc) + the principal will look different. See http://kafka.apache.org/documentation/#security_authz for details. + The principal must be on the format "User:" or kafka will treat it as invalid. It's possible to use + other principal types than "User" if using a custom authorizer for the cluster. + Host -> This must currently be an IP address. It cannot be a range, and it cannot be a domain name. + It can be set to "*", which is special cased in kafka to mean "any host" + Operation -> Which client operation this ACL refers to. Has different meaning depending + on the resource type the ACL refers to. See https://docs.confluent.io/current/kafka/authorization.html#acl-format + for a list of which combinations of resource/operation that unlocks which kafka APIs + Permission Type: Whether this ACL is allowing or denying access + Resource Pattern -> This is a representation of the resource or resource pattern that the ACL + refers to. See the ResourcePattern class for details. + + """ + + def __init__( + self, + principal, + host, + operation, + permission_type, + resource_pattern + ): + super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern) + self.validate() + + def validate(self): + if self.operation == ACLOperation.ANY: + raise IllegalArgumentError("operation cannot be ANY") + if self.permission_type == ACLPermissionType.ANY: + raise IllegalArgumentError("permission_type cannot be ANY") + if not isinstance(self.resource_pattern, ResourcePattern): + raise IllegalArgumentError("resource_pattern must be a ResourcePattern object") + + +class ResourcePatternFilter(object): + def __init__( + self, + resource_type, + resource_name, + pattern_type + ): + self.resource_type = resource_type + self.resource_name = resource_name + self.pattern_type = pattern_type + + self.validate() + + def validate(self): + if not isinstance(self.resource_type, ResourceType): + raise IllegalArgumentError("resource_type must be a ResourceType object") + if not isinstance(self.pattern_type, ACLResourcePatternType): + raise IllegalArgumentError("pattern_type must be an ACLResourcePatternType object") + + def __repr__(self): + return "".format( + self.resource_type.name, + self.resource_name, + self.pattern_type.name + ) + + +class ResourcePattern(ResourcePatternFilter): + """A resource pattern to apply the ACL to + + Resource patterns are used to be able to specify which resources an ACL + describes in a more flexible way than just pointing to a literal topic name for example. + Since KIP-290 (kafka 2.0) it's possible to set an ACL for a prefixed resource name, which + can cut down considerably on the number of ACLs needed when the number of topics and + consumer groups start to grow. + The default pattern_type is LITERAL, and it describes a specific resource. This is also how + ACLs worked before the introduction of prefixed ACLs + """ + + def __init__( + self, + resource_type, + resource_name, + pattern_type=ACLResourcePatternType.LITERAL + ): + super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type) + self.validate() + + def validate(self): + if self.resource_type == ResourceType.ANY: + raise IllegalArgumentError("resource_type cannot be ANY") + if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]: + raise IllegalArgumentError( + "pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name) + ) \ No newline at end of file diff --git a/kafka/admin/client.py b/kafka/admin/client.py index badac32..0ade3e9 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -11,14 +11,16 @@ from kafka.client_async import KafkaClient, selectors import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, - UnrecognizedBrokerVersion) + UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ + ACLResourcePatternType from kafka.version import __version__ @@ -470,14 +472,269 @@ class KafkaAdminClient(object): # describe cluster functionality is in ClusterMetadata # Note: if implemented here, send the request to the least_loaded_node() - # describe_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + @staticmethod + def _convert_describe_acls_response_to_acls(describe_response): + version = describe_response.API_VERSION + + error = Errors.for_code(describe_response.error_code) + acl_list = [] + for resources in describe_response.resources: + if version == 0: + resource_type, resource_name, acls = resources + resource_pattern_type = ACLResourcePatternType.LITERAL.value + elif version <= 1: + resource_type, resource_name, resource_pattern_type, acls = resources + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + for acl in acls: + principal, host, operation, permission_type = acl + conv_acl = ACL( + principal=principal, + host=host, + operation=ACLOperation(operation), + permission_type=ACLPermissionType(permission_type), + resource_pattern=ResourcePattern( + ResourceType(resource_type), + resource_name, + ACLResourcePatternType(resource_pattern_type) + ) + ) + acl_list.append(conv_acl) + + return (acl_list, error,) + + def describe_acls(self, acl_filter): + """Describe a set of ACLs + + Used to return a set of ACLs matching the supplied ACLFilter. + The cluster must be configured with an authorizer for this to work, or + you will get a SecurityDisabledError + + :param acl_filter: an ACLFilter object + :return: tuple of a list of matching ACL objects and a KafkaError (NoError if successful) + """ - # create_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + version = self._matching_api_version(DescribeAclsRequest) + if version == 0: + request = DescribeAclsRequest[version]( + resource_type=acl_filter.resource_pattern.resource_type, + resource_name=acl_filter.resource_pattern.resource_name, + principal=acl_filter.principal, + host=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type + ) + elif version <= 1: + request = DescribeAclsRequest[version]( + resource_type=acl_filter.resource_pattern.resource_type, + resource_name=acl_filter.resource_pattern.resource_name, + resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type, + principal=acl_filter.principal, + host=acl_filter.host, + operation=acl_filter.operation, + permission_type=acl_filter.permission_type - # delete_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() + ) + else: + raise NotImplementedError( + "Support for DescribeAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + # optionally we could retry if error_type.retriable + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + + return self._convert_describe_acls_response_to_acls(response) + + @staticmethod + def _convert_create_acls_resource_request_v0(acl): + + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_resource_request_v1(acl): + + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_create_acls_response_to_acls(acls, create_response): + version = create_response.API_VERSION + + creations_error = [] + creations_success = [] + for i, creations in enumerate(create_response.creation_responses): + if version <= 1: + error_code, error_message = creations + acl = acls[i] + error = Errors.for_code(error_code) + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + if error is Errors.NoError: + creations_success.append(acl) + else: + creations_error.append((acl, error,)) + + return {"succeeded": creations_success, "failed": creations_error} + + def create_acls(self, acls): + """Create a list of ACLs + + This endpoint only accepts a list of concrete ACL objects, no ACLFilters. + Throws TopicAlreadyExistsError if topic is already present. + + :param acls: a list of ACL objects + :return: dict of successes and failures + """ + + for acl in acls: + if not isinstance(acl, ACL): + raise IllegalArgumentError("acls must contain ACL objects") + + version = self._matching_api_version(CreateAclsRequest) + if version == 0: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls] + ) + elif version <= 1: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls] + ) + else: + raise NotImplementedError( + "Support for CreateAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + + + return self._convert_create_acls_response_to_acls(acls, response) + + @staticmethod + def _convert_delete_acls_resource_request_v0(acl): + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_resource_request_v1(acl): + return ( + acl.resource_pattern.resource_type, + acl.resource_pattern.resource_name, + acl.resource_pattern.pattern_type, + acl.principal, + acl.host, + acl.operation, + acl.permission_type + ) + + @staticmethod + def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response): + version = delete_response.API_VERSION + filter_result_list = [] + for i, filter_responses in enumerate(delete_response.filter_responses): + filter_error_code, filter_error_message, matching_acls = filter_responses + filter_error = Errors.for_code(filter_error_code) + acl_result_list = [] + for acl in matching_acls: + if version == 0: + error_code, error_message, resource_type, resource_name, principal, host, operation, permission_type = acl + resource_pattern_type = ACLResourcePatternType.LITERAL.value + elif version == 1: + error_code, error_message, resource_type, resource_name, resource_pattern_type, principal, host, operation, permission_type = acl + else: + raise NotImplementedError( + "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin." + .format(version) + ) + acl_error = Errors.for_code(error_code) + conv_acl = ACL( + principal=principal, + host=host, + operation=ACLOperation(operation), + permission_type=ACLPermissionType(permission_type), + resource_pattern=ResourcePattern( + ResourceType(resource_type), + resource_name, + ACLResourcePatternType(resource_pattern_type) + ) + ) + acl_result_list.append((conv_acl, acl_error,)) + filter_result_list.append((acl_filters[i], acl_result_list, filter_error,)) + return filter_result_list + + def delete_acls(self, acl_filters): + """Delete a set of ACLs + + Deletes all ACLs matching the list of input ACLFilter + + :param acl_filters: a list of ACLFilter + :return: a list of 3-tuples corresponding to the list of input filters. + The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance) + """ + + for acl in acl_filters: + if not isinstance(acl, ACLFilter): + raise IllegalArgumentError("acl_filters must contain ACLFilter type objects") + + version = self._matching_api_version(DeleteAclsRequest) + + if version == 0: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters] + ) + elif version <= 1: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters] + ) + else: + raise NotImplementedError( + "Support for DeleteAcls v{} has not yet been added to KafkaAdmin." + .format(version) + ) + + future = self._send_request_to_node(self._client.least_loaded_node(), request) + self._wait_for_futures([future]) + response = future.value + + return self._convert_delete_acls_response_to_matching_acls(acl_filters, response) @staticmethod def _convert_describe_config_resource_request(config_resource): diff --git a/kafka/errors.py b/kafka/errors.py index f13f978..abef2c5 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -443,6 +443,12 @@ class PolicyViolationError(BrokerResponseError): description = 'Request parameters do not satisfy the configured policy.' +class SecurityDisabledError(BrokerResponseError): + errno = 54 + message = 'SECURITY_DISABLED' + description = 'Security features are disabled.' + + class KafkaUnavailableError(KafkaError): pass diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties index 7d8e2b1..534b7ba 100644 --- a/servers/0.10.0.0/resources/kafka.properties +++ b/servers/0.10.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties index 7d8e2b1..534b7ba 100644 --- a/servers/0.10.0.1/resources/kafka.properties +++ b/servers/0.10.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties index 7d8e2b1..534b7ba 100644 --- a/servers/0.10.1.1/resources/kafka.properties +++ b/servers/0.10.1.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties index 7d8e2b1..534b7ba 100644 --- a/servers/0.10.2.1/resources/kafka.properties +++ b/servers/0.10.2.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/0.11.0.0/resources/kafka.properties +++ b/servers/0.11.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/0.11.0.1/resources/kafka.properties +++ b/servers/0.11.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/0.11.0.2/resources/kafka.properties +++ b/servers/0.11.0.2/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties index b4c4088..a8aaa28 100644 --- a/servers/0.9.0.0/resources/kafka.properties +++ b/servers/0.9.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties index 7d8e2b1..534b7ba 100644 --- a/servers/0.9.0.1/resources/kafka.properties +++ b/servers/0.9.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.0.0/resources/kafka.properties b/servers/1.0.0/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/1.0.0/resources/kafka.properties +++ b/servers/1.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.0.1/resources/kafka.properties b/servers/1.0.1/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/1.0.1/resources/kafka.properties +++ b/servers/1.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.0.2/resources/kafka.properties b/servers/1.0.2/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/1.0.2/resources/kafka.properties +++ b/servers/1.0.2/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.1.0/resources/kafka.properties b/servers/1.1.0/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/1.1.0/resources/kafka.properties +++ b/servers/1.1.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties index 64f94d5..fe6a89f 100644 --- a/servers/1.1.1/resources/kafka.properties +++ b/servers/1.1.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # List of enabled mechanisms, can be more than one sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN diff --git a/servers/2.0.0/resources/kafka.properties b/servers/2.0.0/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/2.0.0/resources/kafka.properties +++ b/servers/2.0.0/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/servers/2.0.1/resources/kafka.properties b/servers/2.0.1/resources/kafka.properties index 28668db..630dbc5 100644 --- a/servers/2.0.1/resources/kafka.properties +++ b/servers/2.0.1/resources/kafka.properties @@ -30,6 +30,9 @@ ssl.key.password=foobar ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks ssl.truststore.password=foobar +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + # The port the socket server listens on #port=9092 diff --git a/test/test_admin.py b/test/test_admin.py index 300d5bc..279f85a 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -26,6 +26,37 @@ def test_new_partitions(): assert good_partitions.new_assignments == [[1, 2, 3]] +def test_acl_resource(): + good_acl = kafka.admin.ACL( + "User:bar", + "*", + kafka.admin.ACLOperation.ALL, + kafka.admin.ACLPermissionType.ALLOW, + kafka.admin.ResourcePattern( + kafka.admin.ResourceType.TOPIC, + "foo", + kafka.admin.ACLResourcePatternType.LITERAL + ) + ) + + assert(good_acl.resource_pattern.resource_type == kafka.admin.ResourceType.TOPIC) + assert(good_acl.operation == kafka.admin.ACLOperation.ALL) + assert(good_acl.permission_type == kafka.admin.ACLPermissionType.ALLOW) + assert(good_acl.resource_pattern.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL) + + with pytest.raises(IllegalArgumentError): + kafka.admin.ACL( + "User:bar", + "*", + kafka.admin.ACLOperation.ANY, + kafka.admin.ACLPermissionType.ANY, + kafka.admin.ResourcePattern( + kafka.admin.ResourceType.TOPIC, + "foo", + kafka.admin.ACLResourcePatternType.LITERAL + ) + ) + def test_new_topic(): with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', -1, -1) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py new file mode 100644 index 0000000..0be1920 --- /dev/null +++ b/test/test_admin_integration.py @@ -0,0 +1,107 @@ +import pytest +import os + +from test.fixtures import ZookeeperFixture, KafkaFixture, version +from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset + +from kafka.errors import NoError +from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL + + +class TestAdminClientIntegration(KafkaIntegrationTestCase): + @classmethod + def setUpClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.zk = ZookeeperFixture.instance() + cls.server = KafkaFixture.instance(0, cls.zk) + + @classmethod + def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + + cls.server.close() + cls.zk.close() + + @kafka_versions('>=0.9.0') + def test_create_describe_delete_acls(self): + """Tests that we can add, list and remove ACLs + """ + + # Setup + brokers = '%s:%d' % (self.server.host, self.server.port) + admin_client = KafkaAdminClient( + bootstrap_servers=brokers + ) + + # Check that we don't have any ACLs in the cluster + acls, error = admin_client.describe_acls( + ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + + self.assertIs(error, NoError) + self.assertEqual(0, len(acls)) + + # Try to add an ACL + acl = ACL( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + result = admin_client.create_acls([acl]) + + self.assertFalse(len(result["failed"])) + self.assertEqual(len(result["succeeded"]), 1) + + # Check that we can list the ACL we created + acl_filter = ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + acls, error = admin_client.describe_acls(acl_filter) + + self.assertIs(error, NoError) + self.assertEqual(1, len(acls)) + + # Remove the ACL + delete_results = admin_client.delete_acls( + [ + ACLFilter( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ] + ) + + self.assertEqual(1, len(delete_results)) + self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs + + + # Make sure the ACL does not exist in the cluster anymore + acls, error = admin_client.describe_acls( + ACLFilter( + principal="*", + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + ) + self.assertIs(error, NoError) + self.assertEqual(0, len(acls)) -- cgit v1.2.1