summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-08-05 10:20:30 -0700
committerDana Powers <dana.powers@gmail.com>2017-08-05 10:20:30 -0700
commit8c857a692b993ec01826ae77e704b07d269a4534 (patch)
tree6601c2a772d71d0f1fb245ec890f476c2441beff
parent165b897139ae69e5935c2618759773572781ef17 (diff)
downloadkafka-python-handle_api_versions.tar.gz
Add private map of api key -> min/max versions to BrokerConnectionhandle_api_versions
-rw-r--r--kafka/conn.py32
-rw-r--r--kafka/protocol/__init__.py37
2 files changed, 58 insertions, 11 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 16eaf62..4fd03ce 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -17,7 +17,7 @@ from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
-from kafka.protocol.commit import GroupCoordinatorResponse
+from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -193,6 +193,7 @@ class BrokerConnection(object):
self._init_port = port
self._init_afi = afi
self.in_flight_requests = collections.deque()
+ self._api_versions = None
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -872,23 +873,31 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
- def _check_api_version_response(self, response):
+ def _handle_api_version_response(self, response):
+ error_type = Errors.for_code(response.error_code)
+ assert error_type is Errors.NoError, "API version check failed"
+ self._api_versions = dict([
+ (api_key, (min_version, max_version))
+ for api_key, min_version, max_version in response.api_versions
+ ])
+ return self._api_versions
+
+ def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
- ((0, 10, 1), MetadataRequest[2])
+ ((0, 11, 0), MetadataRequest[4]),
+ ((0, 10, 2), OffsetFetchRequest[2]),
+ ((0, 10, 1), MetadataRequest[2]),
]
- error_type = Errors.for_code(response.error_code)
- assert error_type is Errors.NoError, "API version check failed"
- max_versions = dict([
- (api_key, max_version)
- for api_key, _, max_version in response.api_versions
- ])
# Get the best match of test cases
for broker_version, struct in sorted(test_cases, reverse=True):
- if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
+ if struct.API_KEY not in api_versions:
+ continue
+ min_version, max_version = api_versions[struct.API_KEY]
+ if min_version <= struct.API_VERSION <= max_version:
return broker_version
# We know that ApiVersionResponse is only supported in 0.10+
@@ -976,7 +985,8 @@ class BrokerConnection(object):
if isinstance(request, ApiVersionRequest[0]):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
- version = self._check_api_version_response(f.value)
+ api_versions = self._handle_api_version_response(f.value)
+ version = self._infer_broker_version_from_api_versions(api_versions)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py
index 2a269a5..4dcf4a4 100644
--- a/kafka/protocol/__init__.py
+++ b/kafka/protocol/__init__.py
@@ -6,3 +6,40 @@ from .legacy import (
CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
ATTRIBUTE_CODEC_MASK, KafkaProtocol,
)
+
+API_KEYS = {
+ 0: 'Produce',
+ 1: 'Fetch',
+ 2: 'ListOffsets',
+ 3: 'Metadata',
+ 4: 'LeaderAndIsr',
+ 5: 'StopReplica',
+ 6: 'UpdateMetadata',
+ 7: 'ControlledShutdown',
+ 8: 'OffsetCommit',
+ 9: 'OffsetFetch',
+ 10: 'FindCoordinator',
+ 11: 'JoinGroup',
+ 12: 'Heartbeat',
+ 13: 'LeaveGroup',
+ 14: 'SyncGroup',
+ 15: 'DescribeGroups',
+ 16: 'ListGroups',
+ 17: 'SaslHandshake',
+ 18: 'ApiVersions',
+ 19: 'CreateTopics',
+ 20: 'DeleteTopics',
+ 21: 'DeleteRecords',
+ 22: 'InitProducerId',
+ 23: 'OffsetForLeaderEpoch',
+ 24: 'AddPartitionsToTxn',
+ 25: 'AddOffsetsToTxn',
+ 26: 'EndTxn',
+ 27: 'WriteTxnMarkers',
+ 28: 'TxnOffsetCommit',
+ 29: 'DescribeAcls',
+ 30: 'CreateAcls',
+ 31: 'DeleteAcls',
+ 32: 'DescribeConfigs',
+ 33: 'AlterConfigs',
+}