diff options
-rw-r--r-- | kafka/admin/client.py | 14 | ||||
-rw-r--r-- | test/test_admin_integration.py | 13 |
2 files changed, 18 insertions, 9 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 0ade3e9..df85f44 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -232,14 +232,20 @@ class KafkaAdminClient(object): :param operation: A list of protocol operation versions from kafka.protocol. :return: The max matching version number between client and broker. """ - version = min(len(operation) - 1, - self._client.get_api_versions()[operation[0].API_KEY][1]) - if version < self._client.get_api_versions()[operation[0].API_KEY][0]: + broker_api_versions = self._client.get_api_versions() + api_key = operation[0].API_KEY + if broker_api_versions is None or api_key not in broker_api_versions: + raise IncompatibleBrokerVersion( + "Kafka broker does not support the '{}' Kafka protocol." + .format(operation[0].__name__)) + min_version, max_version = broker_api_versions[api_key] + version = min(len(operation) - 1, max_version) + if version < min_version: # max library version is less than min broker version. Currently, # no Kafka versions specify a min msg version. Maybe in the future? raise IncompatibleBrokerVersion( "No version of the '{}' Kafka protocol is supported by both the client and broker." - .format(operation.__name__)) + .format(operation[0].__name__)) return version def _validate_timeout(self, timeout_ms): diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index b3dc0cc..27028ce 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -8,10 +8,13 @@ from kafka.errors import NoError from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL +# TODO: Convert to pytest / fixtures +# Note that ACL features require broker 0.11, but other admin apis may work on +# earlier broker versions class TestAdminClientIntegration(KafkaIntegrationTestCase): @classmethod def setUpClass(cls): # noqa - if env_kafka_version() < (0, 10): + if env_kafka_version() < (0, 11): return cls.zk = ZookeeperFixture.instance() @@ -19,19 +22,19 @@ class TestAdminClientIntegration(KafkaIntegrationTestCase): @classmethod def tearDownClass(cls): # noqa - if env_kafka_version() < (0, 10): + if env_kafka_version() < (0, 11): return cls.server.close() cls.zk.close() def setUp(self): - if env_kafka_version() < (0, 10): - self.skipTest('Admin Integration test requires KAFKA_VERSION >= 0.10') + if env_kafka_version() < (0, 11): + self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11') super(TestAdminClientIntegration, self).setUp() def tearDown(self): - if env_kafka_version() < (0, 10): + if env_kafka_version() < (0, 11): return super(TestAdminClientIntegration, self).tearDown() |