summaryrefslogtreecommitdiff
path: root/test/test_admin_integration.py
blob: 2672faa0c55a380a873773b69573a9eec1b6fdcf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import pytest
import os

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset

from kafka.errors import NoError
from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL

# This test suite passes for me locally, but fails on travis
# Needs investigation
DISABLED = True

# TODO: Convert to pytest / fixtures
# Note that ACL features require broker 0.11, but other admin apis may work on
# earlier broker versions
class TestAdminClientIntegration(KafkaIntegrationTestCase):
    @classmethod
    def setUpClass(cls):  # noqa
        if env_kafka_version() < (0, 11) or DISABLED:
            return

        cls.zk = ZookeeperFixture.instance()
        cls.server = KafkaFixture.instance(0, cls.zk)

    @classmethod
    def tearDownClass(cls):  # noqa
        if env_kafka_version() < (0, 11) or DISABLED:
            return

        cls.server.close()
        cls.zk.close()

    def setUp(self):
        if env_kafka_version() < (0, 11) or DISABLED:
            self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11')
        super(TestAdminClientIntegration, self).setUp()

    def tearDown(self):
        if env_kafka_version() < (0, 11) or DISABLED:
            return
        super(TestAdminClientIntegration, self).tearDown()

    def test_create_describe_delete_acls(self):
        """Tests that we can add, list and remove ACLs
        """

        # Setup
        brokers = '%s:%d' % (self.server.host, self.server.port)
        admin_client = KafkaAdminClient(
            bootstrap_servers=brokers
        )

        # Check that we don't have any ACLs in the cluster
        acls, error = admin_client.describe_acls(
            ACLFilter(
                principal=None,
                host="*",
                operation=ACLOperation.ANY,
                permission_type=ACLPermissionType.ANY,
                resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
            )
        )

        self.assertIs(error, NoError)
        self.assertEqual(0, len(acls))

        # Try to add an ACL
        acl = ACL(
            principal="User:test",
            host="*",
            operation=ACLOperation.READ,
            permission_type=ACLPermissionType.ALLOW,
            resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
        )
        result = admin_client.create_acls([acl])

        self.assertFalse(len(result["failed"]))
        self.assertEqual(len(result["succeeded"]), 1)

        # Check that we can list the ACL we created
        acl_filter = ACLFilter(
            principal=None,
            host="*",
            operation=ACLOperation.ANY,
            permission_type=ACLPermissionType.ANY,
            resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
        )
        acls, error = admin_client.describe_acls(acl_filter)

        self.assertIs(error, NoError)
        self.assertEqual(1, len(acls))

        # Remove the ACL
        delete_results = admin_client.delete_acls(
            [
                ACLFilter(
                    principal="User:test",
                    host="*",
                    operation=ACLOperation.READ,
                    permission_type=ACLPermissionType.ALLOW,
                    resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
                )
            ]
        )

        self.assertEqual(1, len(delete_results))
        self.assertEqual(1, len(delete_results[0][1]))  # Check number of affected ACLs


        # Make sure the ACL does not exist in the cluster anymore
        acls, error = admin_client.describe_acls(
            ACLFilter(
                principal="*",
                host="*",
                operation=ACLOperation.ANY,
                permission_type=ACLPermissionType.ANY,
                resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
            )
        )
        self.assertIs(error, NoError)
        self.assertEqual(0, len(acls))