summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py32
1 files changed, 21 insertions, 11 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 6a9c200..ac8bb3d 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -17,7 +17,7 @@ from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
-from kafka.protocol.commit import GroupCoordinatorResponse
+from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
@@ -195,6 +195,7 @@ class BrokerConnection(object):
self._init_port = port
self._init_afi = afi
self.in_flight_requests = collections.deque()
+ self._api_versions = None
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -874,23 +875,31 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
- def _check_api_version_response(self, response):
+ def _handle_api_version_response(self, response):
+ error_type = Errors.for_code(response.error_code)
+ assert error_type is Errors.NoError, "API version check failed"
+ self._api_versions = dict([
+ (api_key, (min_version, max_version))
+ for api_key, min_version, max_version in response.api_versions
+ ])
+ return self._api_versions
+
+ def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
- ((0, 10, 1), MetadataRequest[2])
+ ((0, 11, 0), MetadataRequest[4]),
+ ((0, 10, 2), OffsetFetchRequest[2]),
+ ((0, 10, 1), MetadataRequest[2]),
]
- error_type = Errors.for_code(response.error_code)
- assert error_type is Errors.NoError, "API version check failed"
- max_versions = dict([
- (api_key, max_version)
- for api_key, _, max_version in response.api_versions
- ])
# Get the best match of test cases
for broker_version, struct in sorted(test_cases, reverse=True):
- if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
+ if struct.API_KEY not in api_versions:
+ continue
+ min_version, max_version = api_versions[struct.API_KEY]
+ if min_version <= struct.API_VERSION <= max_version:
return broker_version
# We know that ApiVersionResponse is only supported in 0.10+
@@ -978,7 +987,8 @@ class BrokerConnection(object):
if isinstance(request, ApiVersionRequest[0]):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
- version = self._check_api_version_response(f.value)
+ api_versions = self._handle_api_version_response(f.value)
+ version = self._infer_broker_version_from_api_versions(api_versions)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)