diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-08 15:40:42 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-08 15:57:47 -0700 |
commit | 3e70e17fa9e7439477ee145f2d9151c3a6ef20a9 (patch) | |
tree | e3750c77ac6bb16fe036b842a94db8e50482c0dd | |
parent | 1435356cc5688df509e96eb3fb6ee4ad95732452 (diff) | |
download | kafka-python-3e70e17fa9e7439477ee145f2d9151c3a6ef20a9.tar.gz |
Add private _refresh_on_disconnects flag to KafkaClient
-rw-r--r-- | kafka/client_async.py | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 3dee2e1..bf2f6ea 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -94,6 +94,7 @@ class KafkaClient(object): self._metadata_refresh_in_progress = False self._conns = {} self._connecting = set() + self._refresh_on_disconnects = True self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 @@ -164,10 +165,11 @@ class KafkaClient(object): # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: - log.warning("Node %s connect failed -- refreshing metadata", node_id) if node_id in self._connecting: self._connecting.remove(node_id) - self.cluster.request_update() + if self._refresh_on_disconnects: + log.warning("Node %s connect failed -- refreshing metadata", node_id) + self.cluster.request_update() def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" @@ -597,9 +599,13 @@ class KafkaClient(object): if node_id is None: raise Errors.NoBrokersAvailable() + # We will be intentionally causing socket failures + # and should not trigger metadata refresh + self._refresh_on_disconnects = False self._maybe_connect(node_id) conn = self._conns[node_id] version = conn.check_version() + self._refresh_on_disconnects = True return version def wakeup(self): |