summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-28 17:06:57 -0700
committerDana Powers <dana.powers@gmail.com>2019-09-28 17:06:57 -0700
commit98ebff87a78bafbb15dd95c5174c5a1041a848ed (patch)
tree570ded42e9dca01d00c6230bc19c76b47f83fa44
parent5381591bac7f1322e7a54e4be65d1a54e2898732 (diff)
downloadkafka-python-98ebff87a78bafbb15dd95c5174c5a1041a848ed.tar.gz
Fix Admin Client api version checking; only test ACL integration on 0.11+
-rw-r--r--kafka/admin/client.py14
-rw-r--r--test/test_admin_integration.py13
2 files changed, 18 insertions, 9 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 0ade3e9..df85f44 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -232,14 +232,20 @@ class KafkaAdminClient(object):
:param operation: A list of protocol operation versions from kafka.protocol.
:return: The max matching version number between client and broker.
"""
- version = min(len(operation) - 1,
- self._client.get_api_versions()[operation[0].API_KEY][1])
- if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
+ broker_api_versions = self._client.get_api_versions()
+ api_key = operation[0].API_KEY
+ if broker_api_versions is None or api_key not in broker_api_versions:
+ raise IncompatibleBrokerVersion(
+ "Kafka broker does not support the '{}' Kafka protocol."
+ .format(operation[0].__name__))
+ min_version, max_version = broker_api_versions[api_key]
+ version = min(len(operation) - 1, max_version)
+ if version < min_version:
# max library version is less than min broker version. Currently,
# no Kafka versions specify a min msg version. Maybe in the future?
raise IncompatibleBrokerVersion(
"No version of the '{}' Kafka protocol is supported by both the client and broker."
- .format(operation.__name__))
+ .format(operation[0].__name__))
return version
def _validate_timeout(self, timeout_ms):
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index b3dc0cc..27028ce 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -8,10 +8,13 @@ from kafka.errors import NoError
from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+# TODO: Convert to pytest / fixtures
+# Note that ACL features require broker 0.11, but other admin apis may work on
+# earlier broker versions
class TestAdminClientIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls): # noqa
- if env_kafka_version() < (0, 10):
+ if env_kafka_version() < (0, 11):
return
cls.zk = ZookeeperFixture.instance()
@@ -19,19 +22,19 @@ class TestAdminClientIntegration(KafkaIntegrationTestCase):
@classmethod
def tearDownClass(cls): # noqa
- if env_kafka_version() < (0, 10):
+ if env_kafka_version() < (0, 11):
return
cls.server.close()
cls.zk.close()
def setUp(self):
- if env_kafka_version() < (0, 10):
- self.skipTest('Admin Integration test requires KAFKA_VERSION >= 0.10')
+ if env_kafka_version() < (0, 11):
+ self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11')
super(TestAdminClientIntegration, self).setUp()
def tearDown(self):
- if env_kafka_version() < (0, 10):
+ if env_kafka_version() < (0, 11):
return
super(TestAdminClientIntegration, self).tearDown()