diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-12-29 15:59:57 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-29 15:59:57 -0800 |
commit | 3aada777e9c3bbb5751a15b615d6fbe4693cc6f0 (patch) | |
tree | 9da8d0cba353433b53cbfc62bf96344e69da0211 | |
parent | 41d9f1c032f8e055685c6f8353e23c2e735211ca (diff) | |
download | kafka-python-3aada777e9c3bbb5751a15b615d6fbe4693cc6f0.tar.gz |
Reset conn configs on exception in conn.check_version() (#1977)
-rw-r--r-- | kafka/conn.py | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index dfb8d78..c383123 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1203,6 +1203,10 @@ class BrokerConnection(object): stashed[key] = self.config[key] self.config[key] = override_config[key] + def reset_override_configs(): + for key in stashed: + self.config[key] = stashed[key] + # kafka kills the connection when it doesn't recognize an API request # so we can send a test request and then follow immediately with a # vanilla MetadataRequest. If the server did not recognize the first @@ -1222,6 +1226,7 @@ class BrokerConnection(object): for version, request in test_cases: if not self.connect_blocking(timeout_at - time.time()): + reset_override_configs() raise Errors.NodeNotReadyError() f = self.send(request) # HACK: sleeping to wait for socket to send bytes @@ -1278,10 +1283,10 @@ class BrokerConnection(object): log.info("Broker is not v%s -- it did not recognize %s", version, request.__class__.__name__) else: + reset_override_configs() raise Errors.UnrecognizedBrokerVersion() - for key in stashed: - self.config[key] = stashed[key] + reset_override_configs() return version def __str__(self): |