summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 15:19:51 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 16:11:22 -0700
commit36794db6e7677c86dc0146e64d39ce2c2eb9a781 (patch)
tree7b483a4268cf36cdca8253e6af8a9e3df2531970
parent412ebe44968a52cb03dcab4366972784d01e1655 (diff)
downloadkafka-python-fix_leaks.tar.gz
Dont warn on socket disconnections caused by KafkaClient.close()fix_leaks
-rw-r--r--kafka/client_async.py4
1 files changed, 3 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 7a55a08..62b0095 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -136,6 +136,7 @@ class KafkaClient(object):
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._selector.register(self._wake_r, selectors.EVENT_READ)
+ self._closed = False
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
def _bootstrap(self, hosts):
@@ -226,7 +227,7 @@ class KafkaClient(object):
self._selector.unregister(conn._sock)
except KeyError:
pass
- if self._refresh_on_disconnects:
+ if self._refresh_on_disconnects and not self._closed:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -274,6 +275,7 @@ class KafkaClient(object):
node_id (int, optional): the id of the node to close
"""
if node_id is None:
+ self._closed = True
for conn in self._conns.values():
conn.close()
self._wake_r.close()