summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py25
1 files changed, 17 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 24162ad..0058cf3 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -6,6 +6,7 @@ import functools
import logging
import random
import threading
+import weakref
# selectors in stdlib as of py3.4
try:
@@ -27,6 +28,7 @@ from .metrics import AnonMeasurable
from .metrics.stats import Avg, Count, Rate
from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
+from .util import Dict, WeakMethod
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from .vendor import socketpair
@@ -197,7 +199,7 @@ class KafkaClient(object):
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
- self._conns = {}
+ self._conns = Dict() # object to support weakrefs
self._connecting = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
@@ -220,7 +222,7 @@ class KafkaClient(object):
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
- self._conns)
+ weakref.proxy(self._conns))
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
@@ -248,7 +250,7 @@ class KafkaClient(object):
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- cb = functools.partial(self._conn_state_change, 'bootstrap')
+ cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap')
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
node_id='bootstrap',
@@ -357,7 +359,7 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
host, port, afi = get_ip_port_afi(broker.host)
- cb = functools.partial(self._conn_state_change, node_id)
+ cb = functools.partial(WeakMethod(self._conn_state_change), node_id)
conn = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
node_id=node_id,
@@ -404,6 +406,13 @@ class KafkaClient(object):
return False
return self._conns[node_id].connected()
+ def _close(self):
+ if not self._closed:
+ self._closed = True
+ self._wake_r.close()
+ self._wake_w.close()
+ self._selector.close()
+
def close(self, node_id=None):
"""Close one or all broker connections.
@@ -412,18 +421,18 @@ class KafkaClient(object):
"""
with self._lock:
if node_id is None:
- self._closed = True
+ self._close()
for conn in self._conns.values():
conn.close()
- self._wake_r.close()
- self._wake_w.close()
- self._selector.close()
elif node_id in self._conns:
self._conns[node_id].close()
else:
log.warning("Node %s not found in current connection list; skipping", node_id)
return
+ def __del__(self):
+ self._close()
+
def is_disconnected(self, node_id):
"""Check whether the node connection has been disconnected or failed.