diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 24162ad..0058cf3 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -6,6 +6,7 @@ import functools import logging import random import threading +import weakref # selectors in stdlib as of py3.4 try: @@ -27,6 +28,7 @@ from .metrics import AnonMeasurable from .metrics.stats import Avg, Count, Rate from .metrics.stats.rate import TimeUnit from .protocol.metadata import MetadataRequest +from .util import Dict, WeakMethod # Although this looks unused, it actually monkey-patches socket.socketpair() # and should be left in as long as we're using socket.socketpair() in this file from .vendor import socketpair @@ -197,7 +199,7 @@ class KafkaClient(object): self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False self._selector = self.config['selector']() - self._conns = {} + self._conns = Dict() # object to support weakrefs self._connecting = set() self._refresh_on_disconnects = True self._last_bootstrap = 0 @@ -220,7 +222,7 @@ class KafkaClient(object): if self.config['metrics']: self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix'], - self._conns) + weakref.proxy(self._conns)) self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) @@ -248,7 +250,7 @@ class KafkaClient(object): for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - cb = functools.partial(self._conn_state_change, 'bootstrap') + cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap') bootstrap = BrokerConnection(host, port, afi, state_change_callback=cb, node_id='bootstrap', @@ -357,7 +359,7 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) host, port, afi = get_ip_port_afi(broker.host) - cb = functools.partial(self._conn_state_change, node_id) + cb = functools.partial(WeakMethod(self._conn_state_change), node_id) conn = BrokerConnection(host, broker.port, afi, state_change_callback=cb, node_id=node_id, @@ -404,6 +406,13 @@ class KafkaClient(object): return False return self._conns[node_id].connected() + def _close(self): + if not self._closed: + self._closed = True + self._wake_r.close() + self._wake_w.close() + self._selector.close() + def close(self, node_id=None): """Close one or all broker connections. @@ -412,18 +421,18 @@ class KafkaClient(object): """ with self._lock: if node_id is None: - self._closed = True + self._close() for conn in self._conns.values(): conn.close() - self._wake_r.close() - self._wake_w.close() - self._selector.close() elif node_id in self._conns: self._conns[node_id].close() else: log.warning("Node %s not found in current connection list; skipping", node_id) return + def __del__(self): + self._close() + def is_disconnected(self, node_id): """Check whether the node connection has been disconnected or failed. |