diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-09-28 17:06:57 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-09-28 17:06:57 -0700 |
commit | 98ebff87a78bafbb15dd95c5174c5a1041a848ed (patch) | |
tree | 570ded42e9dca01d00c6230bc19c76b47f83fa44 | |
parent | 5381591bac7f1322e7a54e4be65d1a54e2898732 (diff) | |
download | kafka-python-98ebff87a78bafbb15dd95c5174c5a1041a848ed.tar.gz |
Fix Admin Client api version checking; only test ACL integration on 0.11+
-rw-r--r-- | kafka/admin/client.py | 14 | ||||
-rw-r--r-- | test/test_admin_integration.py | 13 |
2 files changed, 18 insertions, 9 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 0ade3e9..df85f44 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -232,14 +232,20 @@ class KafkaAdminClient(object): :param operation: A list of protocol operation versions from kafka.protocol. :return: The max matching version number between client and broker. """ - version = min(len(operation) - 1, - self._client.get_api_versions()[operation[0].API_KEY][1]) - if version < self._client.get_api_versions()[operation[0].API_KEY][0]: + broker_api_versions = self._client.get_api_versions() + api_key = operation[0].API_KEY + if broker_api_versions is None or api_key not in broker_api_versions: + raise IncompatibleBrokerVersion( + "Kafka broker does not support the '{}' Kafka protocol." + .format(operation[0].__name__)) + min_version, max_version = broker_api_versions[api_key] + version = min(len(operation) - 1, max_version) + if version < min_version: # max library version is less than min broker version. Currently, # no Kafka versions specify a min msg version. Maybe in the future? raise IncompatibleBrokerVersion( "No version of the '{}' Kafka protocol is supported by both the client and broker." - .format(operation.__name__)) + .format(operation[0].__name__)) return version def _validate_timeout(self, timeout_ms): diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index b3dc0cc..27028ce 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -8,10 +8,13 @@ from kafka.errors import NoError from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL +# TODO: Convert to pytest / fixtures +# Note that ACL features require broker 0.11, but other admin apis may work on +# earlier broker versions class TestAdminClientIntegration(KafkaIntegrationTestCase): @classmethod def setUpClass(cls): # noqa - if env_kafka_version() < (0, 10): + if env_kafka_version() < (0, 11): return cls.zk = ZookeeperFixture.instance() @@ -19,19 +22,19 @@ class TestAdminClientIntegration(KafkaIntegrationTestCase): @classmethod def tearDownClass(cls): # noqa - if env_kafka_version() < (0, 10): + if env_kafka_version() < (0, 11): return cls.server.close() cls.zk.close() def setUp(self): - if env_kafka_version() < (0, 10): - self.skipTest('Admin Integration test requires KAFKA_VERSION >= 0.10') + if env_kafka_version() < (0, 11): + self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11') super(TestAdminClientIntegration, self).setUp() def tearDown(self): - if env_kafka_version() < (0, 10): + if env_kafka_version() < (0, 11): return super(TestAdminClientIntegration, self).tearDown() |