diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 15:18:21 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 15:18:21 -0700 |
commit | 2ac8503647003535c872ed82c311c4b335862ec5 (patch) | |
tree | 680fde51d93f3c0bd0d2a7fc9fa75ca6bdb95686 | |
parent | 96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff) | |
download | kafka-python-2ac8503647003535c872ed82c311c4b335862ec5.tar.gz |
Cleanup wakeup socketpair on close to avoid leak in KafkaClient
-rw-r--r-- | kafka/client_async.py | 12 |
1 files changed, 4 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 7079f01..e0db51a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -138,12 +138,6 @@ class KafkaClient(object): self._wake_r.setblocking(False) self._selector.register(self._wake_r, selectors.EVENT_READ) - def __del__(self): - if hasattr(self, '_wake_r'): - self._wake_r.close() - if hasattr(self, '_wake_w'): - self._wake_w.close() - def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails @@ -272,14 +266,16 @@ class KafkaClient(object): return self._conns[node_id].connected() def close(self, node_id=None): - """Closes the connection to a particular node (if there is one). + """Closes one or all broker connections. Arguments: - node_id (int): the id of the node to close + node_id (int, optional): the id of the node to close """ if node_id is None: for conn in self._conns.values(): conn.close() + self._wake_r.close() + self._wake_w.close() elif node_id in self._conns: self._conns[node_id].close() else: |