diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:43 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:43 -0700 |
commit | 77cb35078a7408ebb0eab4bfc2220cc11c10d3b2 (patch) | |
tree | 1470def24c0366292cb862239cb9952f04aedf3e | |
parent | 1d4251a9efa4c5466ba5095f3ba199bf082a72b5 (diff) | |
download | kafka-python-77cb35078a7408ebb0eab4bfc2220cc11c10d3b2.tar.gz |
Fix socket leaks in KafkaClient (#696)
* Cleanup wakeup socketpair on close to avoid leak in KafkaClient
* Cleanup unneeded bootstrap connection to avoid leak in KafkaClient
* Dont warn on socket disconnections caused by KafkaClient.close()
-rw-r--r-- | kafka/client_async.py | 20 | ||||
-rw-r--r-- | test/test_client_async.py | 9 |
2 files changed, 16 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 7079f01..62b0095 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -133,16 +133,11 @@ class KafkaClient(object): self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._selector.register(self._wake_r, selectors.EVENT_READ) - - def __del__(self): - if hasattr(self, '_wake_r'): - self._wake_r.close() - if hasattr(self, '_wake_w'): - self._wake_w.close() + self._closed = False + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails @@ -180,6 +175,8 @@ class KafkaClient(object): # in that case, we should keep the bootstrap connection if not len(self.cluster.brokers()): self._conns['bootstrap'] = bootstrap + else: + bootstrap.close() self._bootstrap_fails = 0 break # No bootstrap found... @@ -230,7 +227,7 @@ class KafkaClient(object): self._selector.unregister(conn._sock) except KeyError: pass - if self._refresh_on_disconnects: + if self._refresh_on_disconnects and not self._closed: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() @@ -272,14 +269,17 @@ class KafkaClient(object): return self._conns[node_id].connected() def close(self, node_id=None): - """Closes the connection to a particular node (if there is one). + """Closes one or all broker connections. Arguments: - node_id (int): the id of the node to close + node_id (int, optional): the id of the node to close """ if node_id is None: + self._closed = True for conn in self._conns.values(): conn.close() + self._wake_r.close() + self._wake_w.close() elif node_id in self._conns: self._conns[node_id].close() else: diff --git a/test/test_client_async.py b/test/test_client_async.py index 605ef1a..5870501 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -183,19 +183,22 @@ def test_close(mocker, conn): cli = KafkaClient() mocker.patch.object(cli, '_selector') + # bootstrap connection should have been closed + assert conn.close.call_count == 1 + # Unknown node - silent cli.close(2) # Single node close cli._maybe_connect(0) - assert not conn.close.call_count - cli.close(0) assert conn.close.call_count == 1 + cli.close(0) + assert conn.close.call_count == 2 # All node close cli._maybe_connect(1) cli.close() - assert conn.close.call_count == 3 + assert conn.close.call_count == 4 def test_is_disconnected(conn): |