From b67a411a7ca020fb9dcdc47781208520454ce905 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 29 Dec 2019 08:25:11 -0800 Subject: Reset conn configs on exception in conn.check_version() --- kafka/conn.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index d4c5464..c2b38b5 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1151,6 +1151,10 @@ class BrokerConnection(object): stashed[key] = self.config[key] self.config[key] = override_config[key] + def reset_override_configs(): + for key in stashed: + self.config[key] = stashed[key] + # kafka kills the connection when it doesn't recognize an API request # so we can send a test request and then follow immediately with a # vanilla MetadataRequest. If the server did not recognize the first @@ -1170,6 +1174,7 @@ class BrokerConnection(object): for version, request in test_cases: if not self.connect_blocking(timeout_at - time.time()): + reset_override_configs() raise Errors.NodeNotReadyError() f = self.send(request) # HACK: sleeping to wait for socket to send bytes @@ -1226,10 +1231,10 @@ class BrokerConnection(object): log.info("Broker is not v%s -- it did not recognize %s", version, request.__class__.__name__) else: + reset_override_configs() raise Errors.UnrecognizedBrokerVersion() - for key in stashed: - self.config[key] = stashed[key] + reset_override_configs() return version def __str__(self): -- cgit v1.2.1