summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 14:28:01 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 14:28:01 -0700
commit3666b66a21776d620f68d2f7ff2fed1bc18b94e5 (patch)
treeadd9a5d0f74597fdeaa186e5498638f9b0a32fe6
parent20f4c95289c694f81a60228a9820601eb57402f4 (diff)
downloadkafka-python-3666b66a21776d620f68d2f7ff2fed1bc18b94e5.tar.gz
#761 Follow-up: use api_version tuples in BrokerConnection.check_version
-rw-r--r--kafka/conn.py21
1 files changed, 10 insertions, 11 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 6028867..38829c6 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -547,7 +547,6 @@ class BrokerConnection(object):
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
"""
-
# Monkeypatch the connection request timeout
# Generally this timeout should not get triggered
# but in case it does, we want it to be reasonably short
@@ -575,11 +574,11 @@ class BrokerConnection(object):
log.addFilter(log_filter)
test_cases = [
- ('0.10', ApiVersionRequest[0]()),
- ('0.9', ListGroupsRequest[0]()),
- ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
- ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
- ('0.8.0', MetadataRequest[0]([])),
+ ((0, 10), ApiVersionRequest[0]()),
+ ((0, 9), ListGroupsRequest[0]()),
+ ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
+ ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
+ ((0, 8, 0), MetadataRequest[0]([])),
]
def connect():
@@ -615,9 +614,9 @@ class BrokerConnection(object):
self._sock.setblocking(False)
if f.succeeded():
- log.info('Broker version identifed as %s', version)
- log.info("Set configuration api_version='%s' to skip auto"
- " check_version requests on startup", version)
+ log.info('Broker version identifed as %s', '.'.join(map(str, version)))
+ log.info('Set configuration api_version=%s to skip auto'
+ ' check_version requests on startup', version)
break
# Only enable strict checking to verify that we understand failure
@@ -634,7 +633,7 @@ class BrokerConnection(object):
# requests (bug...). In this case we expect to see a correlation
# id mismatch
elif (isinstance(f.exception, Errors.CorrelationIdError) and
- version == '0.10'):
+ version == (0, 10)):
pass
elif six.PY2:
assert isinstance(f.exception.args[0], socket.error)
@@ -648,7 +647,7 @@ class BrokerConnection(object):
log.removeFilter(log_filter)
self.config['request_timeout_ms'] = stashed_request_timeout_ms
- return tuple(map(int, version.split('.')))
+ return version
def __repr__(self):
return "<BrokerConnection host=%s/%s port=%d>" % (self.hostname, self.host,