summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/conn.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 9a41d90..6af0d8f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -738,11 +738,15 @@ class BrokerConnection(object):
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
"""
- # Monkeypatch the connection request timeout
- # Generally this timeout should not get triggered
- # but in case it does, we want it to be reasonably short
- stashed_request_timeout_ms = self.config['request_timeout_ms']
- self.config['request_timeout_ms'] = timeout * 1000
+ # Monkeypatch some connection configurations to avoid timeouts
+ override_config = {
+ 'request_timeout_ms': timeout * 1000,
+ 'max_in_flight_requests_per_connection': 5
+ }
+ stashed = {}
+ for key in override_config:
+ stashed[key] = self.config[key]
+ self.config[key] = override_config[key]
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a
@@ -837,7 +841,8 @@ class BrokerConnection(object):
raise Errors.UnrecognizedBrokerVersion()
log.removeFilter(log_filter)
- self.config['request_timeout_ms'] = stashed_request_timeout_ms
+ for key in stashed:
+ self.config[key] = stashed[key]
return version
def __repr__(self):