summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUlrik Johansson <ulrik.johansson@gmail.com>2019-09-28 23:57:05 +0200
committerDana Powers <dana.powers@gmail.com>2019-09-28 14:57:05 -0700
commit76ad6629350f20acfd6038c1e444a89bcd255f89 (patch)
tree00439631523c0c7b5df835f33b92f0f8d95f5812
parent5e4d1516e0d903e411c71474cc5ba9e9b009cd8c (diff)
downloadkafka-python-76ad6629350f20acfd6038c1e444a89bcd255f89.tar.gz
Add ACL api to KafkaAdminClient (#1833)
-rw-r--r--kafka/admin/__init__.py6
-rw-r--r--kafka/admin/acl_resource.py212
-rw-r--r--kafka/admin/client.py273
-rw-r--r--kafka/errors.py6
-rw-r--r--servers/0.10.0.0/resources/kafka.properties3
-rw-r--r--servers/0.10.0.1/resources/kafka.properties3
-rw-r--r--servers/0.10.1.1/resources/kafka.properties3
-rw-r--r--servers/0.10.2.1/resources/kafka.properties3
-rw-r--r--servers/0.11.0.0/resources/kafka.properties3
-rw-r--r--servers/0.11.0.1/resources/kafka.properties3
-rw-r--r--servers/0.11.0.2/resources/kafka.properties3
-rw-r--r--servers/0.9.0.0/resources/kafka.properties3
-rw-r--r--servers/0.9.0.1/resources/kafka.properties3
-rw-r--r--servers/1.0.0/resources/kafka.properties3
-rw-r--r--servers/1.0.1/resources/kafka.properties3
-rw-r--r--servers/1.0.2/resources/kafka.properties3
-rw-r--r--servers/1.1.0/resources/kafka.properties3
-rw-r--r--servers/1.1.1/resources/kafka.properties3
-rw-r--r--servers/2.0.0/resources/kafka.properties3
-rw-r--r--servers/2.0.1/resources/kafka.properties3
-rw-r--r--test/test_admin.py31
-rw-r--r--test/test_admin_integration.py107
22 files changed, 674 insertions, 9 deletions
diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py
index a300301..c240fc6 100644
--- a/kafka/admin/__init__.py
+++ b/kafka/admin/__init__.py
@@ -2,9 +2,13 @@ from __future__ import absolute_import
from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
+from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
+ ResourceType, ACLPermissionType, ACLResourcePatternType)
from kafka.admin.new_topic import NewTopic
from kafka.admin.new_partitions import NewPartitions
__all__ = [
- 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions'
+ 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'ACL', 'ACLFilter',
+ 'ResourcePattern', 'ResourcePatternFilter', 'ACLOperation', 'ResourceType', 'ACLPermissionType',
+ 'ACLResourcePatternType'
]
diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py
new file mode 100644
index 0000000..7a012d2
--- /dev/null
+++ b/kafka/admin/acl_resource.py
@@ -0,0 +1,212 @@
+from __future__ import absolute_import
+from kafka.errors import IllegalArgumentError
+
+# enum in stdlib as of py3.4
+try:
+ from enum import IntEnum # pylint: disable=import-error
+except ImportError:
+ # vendored backport module
+ from kafka.vendor.enum34 import IntEnum
+
+
+class ResourceType(IntEnum):
+ """Type of kafka resource to set ACL for
+
+ The ANY value is only valid in a filter context
+ """
+
+ UNKNOWN = 0,
+ ANY = 1,
+ CLUSTER = 4,
+ DELEGATION_TOKEN = 6,
+ GROUP = 3,
+ TOPIC = 2,
+ TRANSACTIONAL_ID = 5
+
+
+class ACLOperation(IntEnum):
+ """Type of operation
+
+ The ANY value is only valid in a filter context
+ """
+
+ ANY = 1,
+ ALL = 2,
+ READ = 3,
+ WRITE = 4,
+ CREATE = 5,
+ DELETE = 6,
+ ALTER = 7,
+ DESCRIBE = 8,
+ CLUSTER_ACTION = 9,
+ DESCRIBE_CONFIGS = 10,
+ ALTER_CONFIGS = 11,
+ IDEMPOTENT_WRITE = 12
+
+
+class ACLPermissionType(IntEnum):
+ """An enumerated type of permissions
+
+ The ANY value is only valid in a filter context
+ """
+
+ ANY = 1,
+ DENY = 2,
+ ALLOW = 3
+
+
+class ACLResourcePatternType(IntEnum):
+ """An enumerated type of resource patterns
+
+ More details on the pattern types and how they work
+ can be found in KIP-290 (Support for prefixed ACLs)
+ https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs
+ """
+
+ ANY = 1,
+ MATCH = 2,
+ LITERAL = 3,
+ PREFIXED = 4
+
+
+class ACLFilter(object):
+ """Represents a filter to use with describing and deleting ACLs
+
+ The difference between this class and the ACL class is mainly that
+ we allow using ANY with the operation, permission, and resource type objects
+ to fetch ALCs matching any of the properties.
+
+ To make a filter matching any principal, set principal to None
+ """
+
+ def __init__(
+ self,
+ principal,
+ host,
+ operation,
+ permission_type,
+ resource_pattern
+ ):
+ self.principal = principal
+ self.host = host
+ self.operation = operation
+ self.permission_type = permission_type
+ self.resource_pattern = resource_pattern
+
+ self.validate()
+
+ def validate(self):
+ if not isinstance(self.operation, ACLOperation):
+ raise IllegalArgumentError("operation must be an ACLOperation object, and cannot be ANY")
+ if not isinstance(self.permission_type, ACLPermissionType):
+ raise IllegalArgumentError("permission_type must be an ACLPermissionType object, and cannot be ANY")
+ if not isinstance(self.resource_pattern, ResourcePatternFilter):
+ raise IllegalArgumentError("resource_pattern must be a ResourcePatternFilter object")
+
+ def __repr__(self):
+ return "<ACL principal={principal}, resource={resource}, operation={operation}, type={type}, host={host}>".format(
+ principal=self.principal,
+ host=self.host,
+ operation=self.operation.name,
+ type=self.permission_type.name,
+ resource=self.resource_pattern
+ )
+
+
+class ACL(ACLFilter):
+ """Represents a concrete ACL for a specific ResourcePattern
+
+ In kafka an ACL is a 4-tuple of (principal, host, operation, permission_type)
+ that limits who can do what on a specific resource (or since KIP-290 a resource pattern)
+
+ Terminology:
+ Principal -> This is the identifier for the user. Depending on the authorization method used (SSL, SASL etc)
+ the principal will look different. See http://kafka.apache.org/documentation/#security_authz for details.
+ The principal must be on the format "User:<name>" or kafka will treat it as invalid. It's possible to use
+ other principal types than "User" if using a custom authorizer for the cluster.
+ Host -> This must currently be an IP address. It cannot be a range, and it cannot be a domain name.
+ It can be set to "*", which is special cased in kafka to mean "any host"
+ Operation -> Which client operation this ACL refers to. Has different meaning depending
+ on the resource type the ACL refers to. See https://docs.confluent.io/current/kafka/authorization.html#acl-format
+ for a list of which combinations of resource/operation that unlocks which kafka APIs
+ Permission Type: Whether this ACL is allowing or denying access
+ Resource Pattern -> This is a representation of the resource or resource pattern that the ACL
+ refers to. See the ResourcePattern class for details.
+
+ """
+
+ def __init__(
+ self,
+ principal,
+ host,
+ operation,
+ permission_type,
+ resource_pattern
+ ):
+ super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
+ self.validate()
+
+ def validate(self):
+ if self.operation == ACLOperation.ANY:
+ raise IllegalArgumentError("operation cannot be ANY")
+ if self.permission_type == ACLPermissionType.ANY:
+ raise IllegalArgumentError("permission_type cannot be ANY")
+ if not isinstance(self.resource_pattern, ResourcePattern):
+ raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")
+
+
+class ResourcePatternFilter(object):
+ def __init__(
+ self,
+ resource_type,
+ resource_name,
+ pattern_type
+ ):
+ self.resource_type = resource_type
+ self.resource_name = resource_name
+ self.pattern_type = pattern_type
+
+ self.validate()
+
+ def validate(self):
+ if not isinstance(self.resource_type, ResourceType):
+ raise IllegalArgumentError("resource_type must be a ResourceType object")
+ if not isinstance(self.pattern_type, ACLResourcePatternType):
+ raise IllegalArgumentError("pattern_type must be an ACLResourcePatternType object")
+
+ def __repr__(self):
+ return "<ResourcePattern type={}, name={}, pattern={}>".format(
+ self.resource_type.name,
+ self.resource_name,
+ self.pattern_type.name
+ )
+
+
+class ResourcePattern(ResourcePatternFilter):
+ """A resource pattern to apply the ACL to
+
+ Resource patterns are used to be able to specify which resources an ACL
+ describes in a more flexible way than just pointing to a literal topic name for example.
+ Since KIP-290 (kafka 2.0) it's possible to set an ACL for a prefixed resource name, which
+ can cut down considerably on the number of ACLs needed when the number of topics and
+ consumer groups start to grow.
+ The default pattern_type is LITERAL, and it describes a specific resource. This is also how
+ ACLs worked before the introduction of prefixed ACLs
+ """
+
+ def __init__(
+ self,
+ resource_type,
+ resource_name,
+ pattern_type=ACLResourcePatternType.LITERAL
+ ):
+ super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
+ self.validate()
+
+ def validate(self):
+ if self.resource_type == ResourceType.ANY:
+ raise IllegalArgumentError("resource_type cannot be ANY")
+ if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
+ raise IllegalArgumentError(
+ "pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
+ ) \ No newline at end of file
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index badac32..0ade3e9 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -11,14 +11,16 @@ from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
- UnrecognizedBrokerVersion)
+ UnrecognizedBrokerVersion, IllegalArgumentError)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
- ListGroupsRequest, DescribeGroupsRequest)
+ ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
+ ACLResourcePatternType
from kafka.version import __version__
@@ -470,14 +472,269 @@ class KafkaAdminClient(object):
# describe cluster functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()
- # describe_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
+ @staticmethod
+ def _convert_describe_acls_response_to_acls(describe_response):
+ version = describe_response.API_VERSION
+
+ error = Errors.for_code(describe_response.error_code)
+ acl_list = []
+ for resources in describe_response.resources:
+ if version == 0:
+ resource_type, resource_name, acls = resources
+ resource_pattern_type = ACLResourcePatternType.LITERAL.value
+ elif version <= 1:
+ resource_type, resource_name, resource_pattern_type, acls = resources
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+ for acl in acls:
+ principal, host, operation, permission_type = acl
+ conv_acl = ACL(
+ principal=principal,
+ host=host,
+ operation=ACLOperation(operation),
+ permission_type=ACLPermissionType(permission_type),
+ resource_pattern=ResourcePattern(
+ ResourceType(resource_type),
+ resource_name,
+ ACLResourcePatternType(resource_pattern_type)
+ )
+ )
+ acl_list.append(conv_acl)
+
+ return (acl_list, error,)
+
+ def describe_acls(self, acl_filter):
+ """Describe a set of ACLs
+
+ Used to return a set of ACLs matching the supplied ACLFilter.
+ The cluster must be configured with an authorizer for this to work, or
+ you will get a SecurityDisabledError
+
+ :param acl_filter: an ACLFilter object
+ :return: tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
+ """
- # create_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
+ version = self._matching_api_version(DescribeAclsRequest)
+ if version == 0:
+ request = DescribeAclsRequest[version](
+ resource_type=acl_filter.resource_pattern.resource_type,
+ resource_name=acl_filter.resource_pattern.resource_name,
+ principal=acl_filter.principal,
+ host=acl_filter.host,
+ operation=acl_filter.operation,
+ permission_type=acl_filter.permission_type
+ )
+ elif version <= 1:
+ request = DescribeAclsRequest[version](
+ resource_type=acl_filter.resource_pattern.resource_type,
+ resource_name=acl_filter.resource_pattern.resource_name,
+ resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type,
+ principal=acl_filter.principal,
+ host=acl_filter.host,
+ operation=acl_filter.operation,
+ permission_type=acl_filter.permission_type
- # delete_acls protocol not yet implemented
- # Note: send the request to the least_loaded_node()
+ )
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ self._wait_for_futures([future])
+ response = future.value
+
+ error_type = Errors.for_code(response.error_code)
+ if error_type is not Errors.NoError:
+ # optionally we could retry if error_type.retriable
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+
+ return self._convert_describe_acls_response_to_acls(response)
+
+ @staticmethod
+ def _convert_create_acls_resource_request_v0(acl):
+
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_create_acls_resource_request_v1(acl):
+
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.resource_pattern.pattern_type,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_create_acls_response_to_acls(acls, create_response):
+ version = create_response.API_VERSION
+
+ creations_error = []
+ creations_success = []
+ for i, creations in enumerate(create_response.creation_responses):
+ if version <= 1:
+ error_code, error_message = creations
+ acl = acls[i]
+ error = Errors.for_code(error_code)
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ if error is Errors.NoError:
+ creations_success.append(acl)
+ else:
+ creations_error.append((acl, error,))
+
+ return {"succeeded": creations_success, "failed": creations_error}
+
+ def create_acls(self, acls):
+ """Create a list of ACLs
+
+ This endpoint only accepts a list of concrete ACL objects, no ACLFilters.
+ Throws TopicAlreadyExistsError if topic is already present.
+
+ :param acls: a list of ACL objects
+ :return: dict of successes and failures
+ """
+
+ for acl in acls:
+ if not isinstance(acl, ACL):
+ raise IllegalArgumentError("acls must contain ACL objects")
+
+ version = self._matching_api_version(CreateAclsRequest)
+ if version == 0:
+ request = CreateAclsRequest[version](
+ creations=[self._convert_create_acls_resource_request_v0(acl) for acl in acls]
+ )
+ elif version <= 1:
+ request = CreateAclsRequest[version](
+ creations=[self._convert_create_acls_resource_request_v1(acl) for acl in acls]
+ )
+ else:
+ raise NotImplementedError(
+ "Support for CreateAcls v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ self._wait_for_futures([future])
+ response = future.value
+
+
+ return self._convert_create_acls_response_to_acls(acls, response)
+
+ @staticmethod
+ def _convert_delete_acls_resource_request_v0(acl):
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_delete_acls_resource_request_v1(acl):
+ return (
+ acl.resource_pattern.resource_type,
+ acl.resource_pattern.resource_name,
+ acl.resource_pattern.pattern_type,
+ acl.principal,
+ acl.host,
+ acl.operation,
+ acl.permission_type
+ )
+
+ @staticmethod
+ def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response):
+ version = delete_response.API_VERSION
+ filter_result_list = []
+ for i, filter_responses in enumerate(delete_response.filter_responses):
+ filter_error_code, filter_error_message, matching_acls = filter_responses
+ filter_error = Errors.for_code(filter_error_code)
+ acl_result_list = []
+ for acl in matching_acls:
+ if version == 0:
+ error_code, error_message, resource_type, resource_name, principal, host, operation, permission_type = acl
+ resource_pattern_type = ACLResourcePatternType.LITERAL.value
+ elif version == 1:
+ error_code, error_message, resource_type, resource_name, resource_pattern_type, principal, host, operation, permission_type = acl
+ else:
+ raise NotImplementedError(
+ "Support for DescribeAcls Response v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+ acl_error = Errors.for_code(error_code)
+ conv_acl = ACL(
+ principal=principal,
+ host=host,
+ operation=ACLOperation(operation),
+ permission_type=ACLPermissionType(permission_type),
+ resource_pattern=ResourcePattern(
+ ResourceType(resource_type),
+ resource_name,
+ ACLResourcePatternType(resource_pattern_type)
+ )
+ )
+ acl_result_list.append((conv_acl, acl_error,))
+ filter_result_list.append((acl_filters[i], acl_result_list, filter_error,))
+ return filter_result_list
+
+ def delete_acls(self, acl_filters):
+ """Delete a set of ACLs
+
+ Deletes all ACLs matching the list of input ACLFilter
+
+ :param acl_filters: a list of ACLFilter
+ :return: a list of 3-tuples corresponding to the list of input filters.
+ The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance)
+ """
+
+ for acl in acl_filters:
+ if not isinstance(acl, ACLFilter):
+ raise IllegalArgumentError("acl_filters must contain ACLFilter type objects")
+
+ version = self._matching_api_version(DeleteAclsRequest)
+
+ if version == 0:
+ request = DeleteAclsRequest[version](
+ filters=[self._convert_delete_acls_resource_request_v0(acl) for acl in acl_filters]
+ )
+ elif version <= 1:
+ request = DeleteAclsRequest[version](
+ filters=[self._convert_delete_acls_resource_request_v1(acl) for acl in acl_filters]
+ )
+ else:
+ raise NotImplementedError(
+ "Support for DeleteAcls v{} has not yet been added to KafkaAdmin."
+ .format(version)
+ )
+
+ future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ self._wait_for_futures([future])
+ response = future.value
+
+ return self._convert_delete_acls_response_to_matching_acls(acl_filters, response)
@staticmethod
def _convert_describe_config_resource_request(config_resource):
diff --git a/kafka/errors.py b/kafka/errors.py
index f13f978..abef2c5 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -443,6 +443,12 @@ class PolicyViolationError(BrokerResponseError):
description = 'Request parameters do not satisfy the configured policy.'
+class SecurityDisabledError(BrokerResponseError):
+ errno = 54
+ message = 'SECURITY_DISABLED'
+ description = 'Security features are disabled.'
+
+
class KafkaUnavailableError(KafkaError):
pass
diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties
index 7d8e2b1..534b7ba 100644
--- a/servers/0.10.0.0/resources/kafka.properties
+++ b/servers/0.10.0.0/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties
index 7d8e2b1..534b7ba 100644
--- a/servers/0.10.0.1/resources/kafka.properties
+++ b/servers/0.10.0.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties
index 7d8e2b1..534b7ba 100644
--- a/servers/0.10.1.1/resources/kafka.properties
+++ b/servers/0.10.1.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties
index 7d8e2b1..534b7ba 100644
--- a/servers/0.10.2.1/resources/kafka.properties
+++ b/servers/0.10.2.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/0.11.0.0/resources/kafka.properties
+++ b/servers/0.11.0.0/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/0.11.0.1/resources/kafka.properties
+++ b/servers/0.11.0.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/0.11.0.2/resources/kafka.properties
+++ b/servers/0.11.0.2/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties
index b4c4088..a8aaa28 100644
--- a/servers/0.9.0.0/resources/kafka.properties
+++ b/servers/0.9.0.0/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties
index 7d8e2b1..534b7ba 100644
--- a/servers/0.9.0.1/resources/kafka.properties
+++ b/servers/0.9.0.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/1.0.0/resources/kafka.properties b/servers/1.0.0/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/1.0.0/resources/kafka.properties
+++ b/servers/1.0.0/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/1.0.1/resources/kafka.properties b/servers/1.0.1/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/1.0.1/resources/kafka.properties
+++ b/servers/1.0.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/1.0.2/resources/kafka.properties b/servers/1.0.2/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/1.0.2/resources/kafka.properties
+++ b/servers/1.0.2/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/1.1.0/resources/kafka.properties b/servers/1.1.0/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/1.1.0/resources/kafka.properties
+++ b/servers/1.1.0/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties
index 64f94d5..fe6a89f 100644
--- a/servers/1.1.1/resources/kafka.properties
+++ b/servers/1.1.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# List of enabled mechanisms, can be more than one
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
diff --git a/servers/2.0.0/resources/kafka.properties b/servers/2.0.0/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/2.0.0/resources/kafka.properties
+++ b/servers/2.0.0/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/servers/2.0.1/resources/kafka.properties b/servers/2.0.1/resources/kafka.properties
index 28668db..630dbc5 100644
--- a/servers/2.0.1/resources/kafka.properties
+++ b/servers/2.0.1/resources/kafka.properties
@@ -30,6 +30,9 @@ ssl.key.password=foobar
ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks
ssl.truststore.password=foobar
+authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
+allow.everyone.if.no.acl.found=true
+
# The port the socket server listens on
#port=9092
diff --git a/test/test_admin.py b/test/test_admin.py
index 300d5bc..279f85a 100644
--- a/test/test_admin.py
+++ b/test/test_admin.py
@@ -26,6 +26,37 @@ def test_new_partitions():
assert good_partitions.new_assignments == [[1, 2, 3]]
+def test_acl_resource():
+ good_acl = kafka.admin.ACL(
+ "User:bar",
+ "*",
+ kafka.admin.ACLOperation.ALL,
+ kafka.admin.ACLPermissionType.ALLOW,
+ kafka.admin.ResourcePattern(
+ kafka.admin.ResourceType.TOPIC,
+ "foo",
+ kafka.admin.ACLResourcePatternType.LITERAL
+ )
+ )
+
+ assert(good_acl.resource_pattern.resource_type == kafka.admin.ResourceType.TOPIC)
+ assert(good_acl.operation == kafka.admin.ACLOperation.ALL)
+ assert(good_acl.permission_type == kafka.admin.ACLPermissionType.ALLOW)
+ assert(good_acl.resource_pattern.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL)
+
+ with pytest.raises(IllegalArgumentError):
+ kafka.admin.ACL(
+ "User:bar",
+ "*",
+ kafka.admin.ACLOperation.ANY,
+ kafka.admin.ACLPermissionType.ANY,
+ kafka.admin.ResourcePattern(
+ kafka.admin.ResourceType.TOPIC,
+ "foo",
+ kafka.admin.ACLResourcePatternType.LITERAL
+ )
+ )
+
def test_new_topic():
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
new file mode 100644
index 0000000..0be1920
--- /dev/null
+++ b/test/test_admin_integration.py
@@ -0,0 +1,107 @@
+import pytest
+import os
+
+from test.fixtures import ZookeeperFixture, KafkaFixture, version
+from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset
+
+from kafka.errors import NoError
+from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+
+
+class TestAdminClientIntegration(KafkaIntegrationTestCase):
+ @classmethod
+ def setUpClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
+ cls.zk = ZookeeperFixture.instance()
+ cls.server = KafkaFixture.instance(0, cls.zk)
+
+ @classmethod
+ def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
+ cls.server.close()
+ cls.zk.close()
+
+ @kafka_versions('>=0.9.0')
+ def test_create_describe_delete_acls(self):
+ """Tests that we can add, list and remove ACLs
+ """
+
+ # Setup
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ admin_client = KafkaAdminClient(
+ bootstrap_servers=brokers
+ )
+
+ # Check that we don't have any ACLs in the cluster
+ acls, error = admin_client.describe_acls(
+ ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ )
+
+ self.assertIs(error, NoError)
+ self.assertEqual(0, len(acls))
+
+ # Try to add an ACL
+ acl = ACL(
+ principal="User:test",
+ host="*",
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ result = admin_client.create_acls([acl])
+
+ self.assertFalse(len(result["failed"]))
+ self.assertEqual(len(result["succeeded"]), 1)
+
+ # Check that we can list the ACL we created
+ acl_filter = ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ acls, error = admin_client.describe_acls(acl_filter)
+
+ self.assertIs(error, NoError)
+ self.assertEqual(1, len(acls))
+
+ # Remove the ACL
+ delete_results = admin_client.delete_acls(
+ [
+ ACLFilter(
+ principal="User:test",
+ host="*",
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ ]
+ )
+
+ self.assertEqual(1, len(delete_results))
+ self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs
+
+
+ # Make sure the ACL does not exist in the cluster anymore
+ acls, error = admin_client.describe_acls(
+ ACLFilter(
+ principal="*",
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ )
+ self.assertIs(error, NoError)
+ self.assertEqual(0, len(acls))