summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUlrik Johansson <ulrik.johansson@gmail.com>2019-10-07 20:11:58 +0200
committerJeff Widman <jeff@jeffwidman.com>2019-10-07 11:11:58 -0700
commit84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1 (patch)
treef456e1b611ab667f7a8c8209eee0ab1bc283de51
parentf1cda98e0b427116d5eb901bce2d697b3f037e78 (diff)
downloadkafka-python-84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1.tar.gz
convert test_admin_integration to pytest (#1923)
-rw-r--r--test/conftest.py19
-rw-r--r--test/fixtures.py10
-rw-r--r--test/test_admin_integration.py164
3 files changed, 90 insertions, 103 deletions
diff --git a/test/conftest.py b/test/conftest.py
index 267ac6a..bbe4048 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -105,6 +105,25 @@ def kafka_producer_factory(kafka_broker, request):
if _producer[0]:
_producer[0].close()
+@pytest.fixture
+def kafka_admin_client(kafka_admin_client_factory):
+ """Return a KafkaAdminClient fixture"""
+ yield kafka_admin_client_factory()
+
+@pytest.fixture
+def kafka_admin_client_factory(kafka_broker):
+ """Return a KafkaAdminClient factory fixture"""
+ _admin_client = [None]
+
+ def factory(**kafka_admin_client_params):
+ params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy()
+ _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
+ return _admin_client[0]
+
+ yield factory
+
+ if _admin_client[0]:
+ _admin_client[0].close()
@pytest.fixture
def topic(kafka_broker, request):
diff --git a/test/fixtures.py b/test/fixtures.py
index c7748f1..68572b5 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -13,7 +13,7 @@ import py
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
-from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
+from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient
from kafka.client_async import KafkaClient
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
@@ -500,6 +500,14 @@ class KafkaFixture(Fixture):
return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
+ def get_admin_clients(self, cnt=1, **params):
+ params.setdefault('client_id', 'admin_client')
+ params['bootstrap_servers'] = self.bootstrap_server()
+ client_id = params['client_id']
+ for x in range(cnt):
+ params['client_id'] = '%s_%s' % (client_id, random_string(4))
+ yield KafkaAdminClient(**params)
+
def get_consumers(self, cnt, topics, **params):
params.setdefault('client_id', 'consumer')
params.setdefault('heartbeat_interval_ms', 500)
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index 2672faa..3efa021 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -1,122 +1,82 @@
import pytest
-import os
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset
+from test.testutil import env_kafka_version
from kafka.errors import NoError
-from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
-# This test suite passes for me locally, but fails on travis
-# Needs investigation
-DISABLED = True
-# TODO: Convert to pytest / fixtures
-# Note that ACL features require broker 0.11, but other admin apis may work on
-# earlier broker versions
-class TestAdminClientIntegration(KafkaIntegrationTestCase):
- @classmethod
- def setUpClass(cls): # noqa
- if env_kafka_version() < (0, 11) or DISABLED:
- return
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
+def test_create_describe_delete_acls(kafka_admin_client):
+ """Tests that we can add, list and remove ACLs
+ """
- cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk)
-
- @classmethod
- def tearDownClass(cls): # noqa
- if env_kafka_version() < (0, 11) or DISABLED:
- return
-
- cls.server.close()
- cls.zk.close()
-
- def setUp(self):
- if env_kafka_version() < (0, 11) or DISABLED:
- self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11')
- super(TestAdminClientIntegration, self).setUp()
-
- def tearDown(self):
- if env_kafka_version() < (0, 11) or DISABLED:
- return
- super(TestAdminClientIntegration, self).tearDown()
-
- def test_create_describe_delete_acls(self):
- """Tests that we can add, list and remove ACLs
- """
-
- # Setup
- brokers = '%s:%d' % (self.server.host, self.server.port)
- admin_client = KafkaAdminClient(
- bootstrap_servers=brokers
+ # Check that we don't have any ACLs in the cluster
+ acls, error = kafka_admin_client.describe_acls(
+ ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
-
- # Check that we don't have any ACLs in the cluster
- acls, error = admin_client.describe_acls(
+ )
+
+ assert error is NoError
+ assert len(acls) == 0
+
+ # Try to add an ACL
+ acl = ACL(
+ principal="User:test",
+ host="*",
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ result = kafka_admin_client.create_acls([acl])
+
+ assert len(result["failed"]) == 0
+ assert len(result["succeeded"]) == 1
+
+ # Check that we can list the ACL we created
+ acl_filter = ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ acls, error = kafka_admin_client.describe_acls(acl_filter)
+
+ assert error is NoError
+ assert len(acls) == 1
+
+ # Remove the ACL
+ delete_results = kafka_admin_client.delete_acls(
+ [
ACLFilter(
- principal=None,
+ principal="User:test",
host="*",
- operation=ACLOperation.ANY,
- permission_type=ACLPermissionType.ANY,
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
- )
+ ]
+ )
- self.assertIs(error, NoError)
- self.assertEqual(0, len(acls))
+ assert len(delete_results) == 1
+ assert len(delete_results[0][1]) == 1 # Check number of affected ACLs
- # Try to add an ACL
- acl = ACL(
- principal="User:test",
- host="*",
- operation=ACLOperation.READ,
- permission_type=ACLPermissionType.ALLOW,
- resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
- )
- result = admin_client.create_acls([acl])
-
- self.assertFalse(len(result["failed"]))
- self.assertEqual(len(result["succeeded"]), 1)
-
- # Check that we can list the ACL we created
- acl_filter = ACLFilter(
- principal=None,
+ # Make sure the ACL does not exist in the cluster anymore
+ acls, error = kafka_admin_client.describe_acls(
+ ACLFilter(
+ principal="*",
host="*",
operation=ACLOperation.ANY,
permission_type=ACLPermissionType.ANY,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
- acls, error = admin_client.describe_acls(acl_filter)
-
- self.assertIs(error, NoError)
- self.assertEqual(1, len(acls))
-
- # Remove the ACL
- delete_results = admin_client.delete_acls(
- [
- ACLFilter(
- principal="User:test",
- host="*",
- operation=ACLOperation.READ,
- permission_type=ACLPermissionType.ALLOW,
- resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
- )
- ]
- )
+ )
- self.assertEqual(1, len(delete_results))
- self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs
-
-
- # Make sure the ACL does not exist in the cluster anymore
- acls, error = admin_client.describe_acls(
- ACLFilter(
- principal="*",
- host="*",
- operation=ACLOperation.ANY,
- permission_type=ACLPermissionType.ANY,
- resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
- )
- )
- self.assertIs(error, NoError)
- self.assertEqual(0, len(acls))
+ assert error is NoError
+ assert len(acls) == 0