diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 15:19:17 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 16:11:22 -0700 |
commit | 412ebe44968a52cb03dcab4366972784d01e1655 (patch) | |
tree | 617c853c9e5c5692879ff69420dec991268bfa77 | |
parent | 2ac8503647003535c872ed82c311c4b335862ec5 (diff) | |
download | kafka-python-412ebe44968a52cb03dcab4366972784d01e1655.tar.gz |
Cleanup unneeded bootstrap connection to avoid leak in KafkaClient
-rw-r--r-- | kafka/client_async.py | 4 | ||||
-rw-r--r-- | test/test_client_async.py | 9 |
2 files changed, 9 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e0db51a..7a55a08 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -133,10 +133,10 @@ class KafkaClient(object): self._delayed_tasks = DelayedTaskQueue() self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._selector.register(self._wake_r, selectors.EVENT_READ) + self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails @@ -174,6 +174,8 @@ class KafkaClient(object): # in that case, we should keep the bootstrap connection if not len(self.cluster.brokers()): self._conns['bootstrap'] = bootstrap + else: + bootstrap.close() self._bootstrap_fails = 0 break # No bootstrap found... diff --git a/test/test_client_async.py b/test/test_client_async.py index 605ef1a..5870501 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -183,19 +183,22 @@ def test_close(mocker, conn): cli = KafkaClient() mocker.patch.object(cli, '_selector') + # bootstrap connection should have been closed + assert conn.close.call_count == 1 + # Unknown node - silent cli.close(2) # Single node close cli._maybe_connect(0) - assert not conn.close.call_count - cli.close(0) assert conn.close.call_count == 1 + cli.close(0) + assert conn.close.call_count == 2 # All node close cli._maybe_connect(1) cli.close() - assert conn.close.call_count == 3 + assert conn.close.call_count == 4 def test_is_disconnected(conn): |