summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-09-24 13:33:54 -0700
committerDana Powers <dana.powers@gmail.com>2016-09-24 13:33:54 -0700
commit189ab3f197a95492eedc1dcc6b78dd159264b812 (patch)
tree1f60344b106a41a622c05cedb6b5f6c16c610a09
parent2a7aca1630b81669595d753083239ec9fbf66ff5 (diff)
downloadkafka-python-conn_check_version_max_in_flight.tar.gz
Monkeypatch max_in_flight_requests_per_connection when checking broker versionconn_check_version_max_in_flight
-rw-r--r--kafka/conn.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 9a41d90..6af0d8f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -738,11 +738,15 @@ class BrokerConnection(object):
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
"""
- # Monkeypatch the connection request timeout
- # Generally this timeout should not get triggered
- # but in case it does, we want it to be reasonably short
- stashed_request_timeout_ms = self.config['request_timeout_ms']
- self.config['request_timeout_ms'] = timeout * 1000
+ # Monkeypatch some connection configurations to avoid timeouts
+ override_config = {
+ 'request_timeout_ms': timeout * 1000,
+ 'max_in_flight_requests_per_connection': 5
+ }
+ stashed = {}
+ for key in override_config:
+ stashed[key] = self.config[key]
+ self.config[key] = override_config[key]
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a
@@ -837,7 +841,8 @@ class BrokerConnection(object):
raise Errors.UnrecognizedBrokerVersion()
log.removeFilter(log_filter)
- self.config['request_timeout_ms'] = stashed_request_timeout_ms
+ for key in stashed:
+ self.config[key] = stashed[key]
return version
def __repr__(self):