diff options
author | Ulrik Johansson <ulrik.johansson@gmail.com> | 2019-10-07 20:11:58 +0200 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-10-07 11:11:58 -0700 |
commit | 84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1 (patch) | |
tree | f456e1b611ab667f7a8c8209eee0ab1bc283de51 | |
parent | f1cda98e0b427116d5eb901bce2d697b3f037e78 (diff) | |
download | kafka-python-84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1.tar.gz |
convert test_admin_integration to pytest (#1923)
-rw-r--r-- | test/conftest.py | 19 | ||||
-rw-r--r-- | test/fixtures.py | 10 | ||||
-rw-r--r-- | test/test_admin_integration.py | 164 |
3 files changed, 90 insertions, 103 deletions
diff --git a/test/conftest.py b/test/conftest.py index 267ac6a..bbe4048 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -105,6 +105,25 @@ def kafka_producer_factory(kafka_broker, request): if _producer[0]: _producer[0].close() +@pytest.fixture +def kafka_admin_client(kafka_admin_client_factory): + """Return a KafkaAdminClient fixture""" + yield kafka_admin_client_factory() + +@pytest.fixture +def kafka_admin_client_factory(kafka_broker): + """Return a KafkaAdminClient factory fixture""" + _admin_client = [None] + + def factory(**kafka_admin_client_params): + params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy() + _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params)) + return _admin_client[0] + + yield factory + + if _admin_client[0]: + _admin_client[0].close() @pytest.fixture def topic(kafka_broker, request): diff --git a/test/fixtures.py b/test/fixtures.py index c7748f1..68572b5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -13,7 +13,7 @@ import py from kafka.vendor.six.moves import urllib, range from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 -from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient +from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient from kafka.client_async import KafkaClient from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest @@ -500,6 +500,14 @@ class KafkaFixture(Fixture): return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)), bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) + def get_admin_clients(self, cnt=1, **params): + params.setdefault('client_id', 'admin_client') + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for x in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaAdminClient(**params) + def get_consumers(self, cnt, topics, **params): params.setdefault('client_id', 'consumer') params.setdefault('heartbeat_interval_ms', 500) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2672faa..3efa021 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,122 +1,82 @@ import pytest -import os -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset +from test.testutil import env_kafka_version from kafka.errors import NoError -from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL +from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL -# This test suite passes for me locally, but fails on travis -# Needs investigation -DISABLED = True -# TODO: Convert to pytest / fixtures -# Note that ACL features require broker 0.11, but other admin apis may work on -# earlier broker versions -class TestAdminClientIntegration(KafkaIntegrationTestCase): - @classmethod - def setUpClass(cls): # noqa - if env_kafka_version() < (0, 11) or DISABLED: - return +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") +def test_create_describe_delete_acls(kafka_admin_client): + """Tests that we can add, list and remove ACLs + """ - cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk) - - @classmethod - def tearDownClass(cls): # noqa - if env_kafka_version() < (0, 11) or DISABLED: - return - - cls.server.close() - cls.zk.close() - - def setUp(self): - if env_kafka_version() < (0, 11) or DISABLED: - self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11') - super(TestAdminClientIntegration, self).setUp() - - def tearDown(self): - if env_kafka_version() < (0, 11) or DISABLED: - return - super(TestAdminClientIntegration, self).tearDown() - - def test_create_describe_delete_acls(self): - """Tests that we can add, list and remove ACLs - """ - - # Setup - brokers = '%s:%d' % (self.server.host, self.server.port) - admin_client = KafkaAdminClient( - bootstrap_servers=brokers + # Check that we don't have any ACLs in the cluster + acls, error = kafka_admin_client.describe_acls( + ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") ) - - # Check that we don't have any ACLs in the cluster - acls, error = admin_client.describe_acls( + ) + + assert error is NoError + assert len(acls) == 0 + + # Try to add an ACL + acl = ACL( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + result = kafka_admin_client.create_acls([acl]) + + assert len(result["failed"]) == 0 + assert len(result["succeeded"]) == 1 + + # Check that we can list the ACL we created + acl_filter = ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + acls, error = kafka_admin_client.describe_acls(acl_filter) + + assert error is NoError + assert len(acls) == 1 + + # Remove the ACL + delete_results = kafka_admin_client.delete_acls( + [ ACLFilter( - principal=None, + principal="User:test", host="*", - operation=ACLOperation.ANY, - permission_type=ACLPermissionType.ANY, + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") ) - ) + ] + ) - self.assertIs(error, NoError) - self.assertEqual(0, len(acls)) + assert len(delete_results) == 1 + assert len(delete_results[0][1]) == 1 # Check number of affected ACLs - # Try to add an ACL - acl = ACL( - principal="User:test", - host="*", - operation=ACLOperation.READ, - permission_type=ACLPermissionType.ALLOW, - resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") - ) - result = admin_client.create_acls([acl]) - - self.assertFalse(len(result["failed"])) - self.assertEqual(len(result["succeeded"]), 1) - - # Check that we can list the ACL we created - acl_filter = ACLFilter( - principal=None, + # Make sure the ACL does not exist in the cluster anymore + acls, error = kafka_admin_client.describe_acls( + ACLFilter( + principal="*", host="*", operation=ACLOperation.ANY, permission_type=ACLPermissionType.ANY, resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") ) - acls, error = admin_client.describe_acls(acl_filter) - - self.assertIs(error, NoError) - self.assertEqual(1, len(acls)) - - # Remove the ACL - delete_results = admin_client.delete_acls( - [ - ACLFilter( - principal="User:test", - host="*", - operation=ACLOperation.READ, - permission_type=ACLPermissionType.ALLOW, - resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") - ) - ] - ) + ) - self.assertEqual(1, len(delete_results)) - self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs - - - # Make sure the ACL does not exist in the cluster anymore - acls, error = admin_client.describe_acls( - ACLFilter( - principal="*", - host="*", - operation=ACLOperation.ANY, - permission_type=ACLPermissionType.ANY, - resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") - ) - ) - self.assertIs(error, NoError) - self.assertEqual(0, len(acls)) + assert error is NoError + assert len(acls) == 0 |