summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 17:14:43 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 17:14:43 -0700
commit77cb35078a7408ebb0eab4bfc2220cc11c10d3b2 (patch)
tree1470def24c0366292cb862239cb9952f04aedf3e
parent1d4251a9efa4c5466ba5095f3ba199bf082a72b5 (diff)
downloadkafka-python-77cb35078a7408ebb0eab4bfc2220cc11c10d3b2.tar.gz
Fix socket leaks in KafkaClient (#696)
* Cleanup wakeup socketpair on close to avoid leak in KafkaClient * Cleanup unneeded bootstrap connection to avoid leak in KafkaClient * Dont warn on socket disconnections caused by KafkaClient.close()
-rw-r--r--kafka/client_async.py20
-rw-r--r--test/test_client_async.py9
2 files changed, 16 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 7079f01..62b0095 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -133,16 +133,11 @@ class KafkaClient(object):
self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._selector.register(self._wake_r, selectors.EVENT_READ)
-
- def __del__(self):
- if hasattr(self, '_wake_r'):
- self._wake_r.close()
- if hasattr(self, '_wake_w'):
- self._wake_w.close()
+ self._closed = False
+ self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
@@ -180,6 +175,8 @@ class KafkaClient(object):
# in that case, we should keep the bootstrap connection
if not len(self.cluster.brokers()):
self._conns['bootstrap'] = bootstrap
+ else:
+ bootstrap.close()
self._bootstrap_fails = 0
break
# No bootstrap found...
@@ -230,7 +227,7 @@ class KafkaClient(object):
self._selector.unregister(conn._sock)
except KeyError:
pass
- if self._refresh_on_disconnects:
+ if self._refresh_on_disconnects and not self._closed:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -272,14 +269,17 @@ class KafkaClient(object):
return self._conns[node_id].connected()
def close(self, node_id=None):
- """Closes the connection to a particular node (if there is one).
+ """Closes one or all broker connections.
Arguments:
- node_id (int): the id of the node to close
+ node_id (int, optional): the id of the node to close
"""
if node_id is None:
+ self._closed = True
for conn in self._conns.values():
conn.close()
+ self._wake_r.close()
+ self._wake_w.close()
elif node_id in self._conns:
self._conns[node_id].close()
else:
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 605ef1a..5870501 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -183,19 +183,22 @@ def test_close(mocker, conn):
cli = KafkaClient()
mocker.patch.object(cli, '_selector')
+ # bootstrap connection should have been closed
+ assert conn.close.call_count == 1
+
# Unknown node - silent
cli.close(2)
# Single node close
cli._maybe_connect(0)
- assert not conn.close.call_count
- cli.close(0)
assert conn.close.call_count == 1
+ cli.close(0)
+ assert conn.close.call_count == 2
# All node close
cli._maybe_connect(1)
cli.close()
- assert conn.close.call_count == 3
+ assert conn.close.call_count == 4
def test_is_disconnected(conn):