summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 15:18:21 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 15:18:21 -0700
commit2ac8503647003535c872ed82c311c4b335862ec5 (patch)
tree680fde51d93f3c0bd0d2a7fc9fa75ca6bdb95686
parent96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff)
downloadkafka-python-2ac8503647003535c872ed82c311c4b335862ec5.tar.gz
Cleanup wakeup socketpair on close to avoid leak in KafkaClient
-rw-r--r--kafka/client_async.py12
1 files changed, 4 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 7079f01..e0db51a 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -138,12 +138,6 @@ class KafkaClient(object):
self._wake_r.setblocking(False)
self._selector.register(self._wake_r, selectors.EVENT_READ)
- def __del__(self):
- if hasattr(self, '_wake_r'):
- self._wake_r.close()
- if hasattr(self, '_wake_w'):
- self._wake_w.close()
-
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
@@ -272,14 +266,16 @@ class KafkaClient(object):
return self._conns[node_id].connected()
def close(self, node_id=None):
- """Closes the connection to a particular node (if there is one).
+ """Closes one or all broker connections.
Arguments:
- node_id (int): the id of the node to close
+ node_id (int, optional): the id of the node to close
"""
if node_id is None:
for conn in self._conns.values():
conn.close()
+ self._wake_r.close()
+ self._wake_w.close()
elif node_id in self._conns:
self._conns[node_id].close()
else: