From da25df6d3c6380e27bf638f3620613d05ac9fd03 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 6 Aug 2017 17:41:13 -0700 Subject: Add private map of api key -> min/max versions to BrokerConnection (#1169) --- kafka/conn.py | 32 +++++++++++++++++++++----------- kafka/protocol/__init__.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 6a9c200..ac8bb3d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -17,7 +17,7 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest -from kafka.protocol.commit import GroupCoordinatorResponse +from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -195,6 +195,7 @@ class BrokerConnection(object): self._init_port = port self._init_afi = afi self.in_flight_requests = collections.deque() + self._api_versions = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -874,23 +875,31 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id - def _check_api_version_response(self, response): + def _handle_api_version_response(self, response): + error_type = Errors.for_code(response.error_code) + assert error_type is Errors.NoError, "API version check failed" + self._api_versions = dict([ + (api_key, (min_version, max_version)) + for api_key, min_version, max_version in response.api_versions + ]) + return self._api_versions + + def _infer_broker_version_from_api_versions(self, api_versions): # The logic here is to check the list of supported request versions # in descending order. As soon as we find one that works, return it test_cases = [ # format (, ) - ((0, 10, 1), MetadataRequest[2]) + ((0, 11, 0), MetadataRequest[4]), + ((0, 10, 2), OffsetFetchRequest[2]), + ((0, 10, 1), MetadataRequest[2]), ] - error_type = Errors.for_code(response.error_code) - assert error_type is Errors.NoError, "API version check failed" - max_versions = dict([ - (api_key, max_version) - for api_key, _, max_version in response.api_versions - ]) # Get the best match of test cases for broker_version, struct in sorted(test_cases, reverse=True): - if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION: + if struct.API_KEY not in api_versions: + continue + min_version, max_version = api_versions[struct.API_KEY] + if min_version <= struct.API_VERSION <= max_version: return broker_version # We know that ApiVersionResponse is only supported in 0.10+ @@ -978,7 +987,8 @@ class BrokerConnection(object): if isinstance(request, ApiVersionRequest[0]): # Starting from 0.10 kafka broker we determine version # by looking at ApiVersionResponse - version = self._check_api_version_response(f.value) + api_versions = self._handle_api_version_response(f.value) + version = self._infer_broker_version_from_api_versions(api_versions) log.info('Broker version identifed as %s', '.'.join(map(str, version))) log.info('Set configuration api_version=%s to skip auto' ' check_version requests on startup', version) diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py index 2a269a5..4dcf4a4 100644 --- a/kafka/protocol/__init__.py +++ b/kafka/protocol/__init__.py @@ -6,3 +6,40 @@ from .legacy import ( CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS, ATTRIBUTE_CODEC_MASK, KafkaProtocol, ) + +API_KEYS = { + 0: 'Produce', + 1: 'Fetch', + 2: 'ListOffsets', + 3: 'Metadata', + 4: 'LeaderAndIsr', + 5: 'StopReplica', + 6: 'UpdateMetadata', + 7: 'ControlledShutdown', + 8: 'OffsetCommit', + 9: 'OffsetFetch', + 10: 'FindCoordinator', + 11: 'JoinGroup', + 12: 'Heartbeat', + 13: 'LeaveGroup', + 14: 'SyncGroup', + 15: 'DescribeGroups', + 16: 'ListGroups', + 17: 'SaslHandshake', + 18: 'ApiVersions', + 19: 'CreateTopics', + 20: 'DeleteTopics', + 21: 'DeleteRecords', + 22: 'InitProducerId', + 23: 'OffsetForLeaderEpoch', + 24: 'AddPartitionsToTxn', + 25: 'AddOffsetsToTxn', + 26: 'EndTxn', + 27: 'WriteTxnMarkers', + 28: 'TxnOffsetCommit', + 29: 'DescribeAcls', + 30: 'CreateAcls', + 31: 'DeleteAcls', + 32: 'DescribeConfigs', + 33: 'AlterConfigs', +} -- cgit v1.2.1