summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 15:19:17 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 16:11:22 -0700
commit412ebe44968a52cb03dcab4366972784d01e1655 (patch)
tree617c853c9e5c5692879ff69420dec991268bfa77
parent2ac8503647003535c872ed82c311c4b335862ec5 (diff)
downloadkafka-python-412ebe44968a52cb03dcab4366972784d01e1655.tar.gz
Cleanup unneeded bootstrap connection to avoid leak in KafkaClient
-rw-r--r--kafka/client_async.py4
-rw-r--r--test/test_client_async.py9
2 files changed, 9 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e0db51a..7a55a08 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -133,10 +133,10 @@ class KafkaClient(object):
self._delayed_tasks = DelayedTaskQueue()
self._last_bootstrap = 0
self._bootstrap_fails = 0
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._selector.register(self._wake_r, selectors.EVENT_READ)
+ self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
@@ -174,6 +174,8 @@ class KafkaClient(object):
# in that case, we should keep the bootstrap connection
if not len(self.cluster.brokers()):
self._conns['bootstrap'] = bootstrap
+ else:
+ bootstrap.close()
self._bootstrap_fails = 0
break
# No bootstrap found...
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 605ef1a..5870501 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -183,19 +183,22 @@ def test_close(mocker, conn):
cli = KafkaClient()
mocker.patch.object(cli, '_selector')
+ # bootstrap connection should have been closed
+ assert conn.close.call_count == 1
+
# Unknown node - silent
cli.close(2)
# Single node close
cli._maybe_connect(0)
- assert not conn.close.call_count
- cli.close(0)
assert conn.close.call_count == 1
+ cli.close(0)
+ assert conn.close.call_count == 2
# All node close
cli._maybe_connect(1)
cli.close()
- assert conn.close.call_count == 3
+ assert conn.close.call_count == 4
def test_is_disconnected(conn):