diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-01-10 15:56:47 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-10 15:56:47 -0800 |
commit | 0a7492443c78d4791cfdf3d6384c02f1c7757c7b (patch) | |
tree | e25e986965144b1dc5b8e6c12d58794178d0275a | |
parent | 794b695e7ceff25834616bb54e32160104040df4 (diff) | |
download | kafka-python-0a7492443c78d4791cfdf3d6384c02f1c7757c7b.tar.gz |
Improve KafkaConsumer cleanup (#1339)
-rw-r--r-- | kafka/client_async.py | 25 | ||||
-rw-r--r-- | kafka/conn.py | 12 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 38 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 1 | ||||
-rw-r--r-- | kafka/util.py | 8 |
5 files changed, 62 insertions, 22 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 24162ad..0058cf3 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -6,6 +6,7 @@ import functools import logging import random import threading +import weakref # selectors in stdlib as of py3.4 try: @@ -27,6 +28,7 @@ from .metrics import AnonMeasurable from .metrics.stats import Avg, Count, Rate from .metrics.stats.rate import TimeUnit from .protocol.metadata import MetadataRequest +from .util import Dict, WeakMethod # Although this looks unused, it actually monkey-patches socket.socketpair() # and should be left in as long as we're using socket.socketpair() in this file from .vendor import socketpair @@ -197,7 +199,7 @@ class KafkaClient(object): self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False self._selector = self.config['selector']() - self._conns = {} + self._conns = Dict() # object to support weakrefs self._connecting = set() self._refresh_on_disconnects = True self._last_bootstrap = 0 @@ -220,7 +222,7 @@ class KafkaClient(object): if self.config['metrics']: self._sensors = KafkaClientMetrics(self.config['metrics'], self.config['metric_group_prefix'], - self._conns) + weakref.proxy(self._conns)) self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) @@ -248,7 +250,7 @@ class KafkaClient(object): for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) - cb = functools.partial(self._conn_state_change, 'bootstrap') + cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap') bootstrap = BrokerConnection(host, port, afi, state_change_callback=cb, node_id='bootstrap', @@ -357,7 +359,7 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) host, port, afi = get_ip_port_afi(broker.host) - cb = functools.partial(self._conn_state_change, node_id) + cb = functools.partial(WeakMethod(self._conn_state_change), node_id) conn = BrokerConnection(host, broker.port, afi, state_change_callback=cb, node_id=node_id, @@ -404,6 +406,13 @@ class KafkaClient(object): return False return self._conns[node_id].connected() + def _close(self): + if not self._closed: + self._closed = True + self._wake_r.close() + self._wake_w.close() + self._selector.close() + def close(self, node_id=None): """Close one or all broker connections. @@ -412,18 +421,18 @@ class KafkaClient(object): """ with self._lock: if node_id is None: - self._closed = True + self._close() for conn in self._conns.values(): conn.close() - self._wake_r.close() - self._wake_w.close() - self._selector.close() elif node_id in self._conns: self._conns[node_id].close() else: log.warning("Node %s not found in current connection list; skipping", node_id) return + def __del__(self): + self._close() + def is_disconnected(self, node_id): """Check whether the node connection has been disconnected or failed. diff --git a/kafka/conn.py b/kafka/conn.py index 23edf22..1e6770f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -628,6 +628,14 @@ class BrokerConnection(object): self._reconnect_backoff /= 1000.0 log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) + def _close_socket(self): + if self._sock: + self._sock.close() + self._sock = None + + def __del__(self): + self._close_socket() + def close(self, error=None): """Close socket and fail all in-flight-requests. @@ -641,9 +649,7 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTING self.config['state_change_callback'](self) self._update_reconnect_backoff() - if self._sock: - self._sock.close() - self._sock = None + self._close_socket() self.state = ConnectionStates.DISCONNECTED self._sasl_auth_future = None self._protocol = KafkaProtocol( diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b16c1e1..30b9c40 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -356,10 +356,7 @@ class BaseCoordinator(object): self.rejoining = True if self._heartbeat_thread is None: - log.debug('Starting new heartbeat thread') - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() + self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() @@ -712,13 +709,30 @@ class BaseCoordinator(object): def request_rejoin(self): self.rejoin_needed = True + def _start_heartbeat_thread(self): + if self._heartbeat_thread is None: + log.info('Starting new heartbeat thread') + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + + def _close_heartbeat_thread(self): + if self._heartbeat_thread is not None: + log.info('Stopping heartbeat thread') + try: + self._heartbeat_thread.close() + except ReferenceError: + pass + self._heartbeat_thread = None + + def __del__(self): + self._close_heartbeat_thread() + def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" with self._lock: - if self._heartbeat_thread is not None: - self._heartbeat_thread.close() - self._heartbeat_thread = None + self._close_heartbeat_thread() self.maybe_leave_group() def maybe_leave_group(self): @@ -877,12 +891,11 @@ class HeartbeatThread(threading.Thread): self.coordinator._lock.notify() def disable(self): - with self.coordinator._lock: - self.enabled = False + self.enabled = False def close(self): + self.closed = True with self.coordinator._lock: - self.closed = True self.coordinator._lock.notify() def run(self): @@ -890,7 +903,10 @@ class HeartbeatThread(threading.Thread): while not self.closed: self._run_once() - log.debug('Heartbeat closed!') + log.debug('Heartbeat thread closed') + + except ReferenceError: + log.debug('Heartbeat thread closed due to coordinator gc') except RuntimeError as e: log.error("Heartbeat thread for group %s failed due to unexpected error: %s", diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 48dcad4..ab30883 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -125,6 +125,7 @@ class ConsumerCoordinator(BaseCoordinator): def __del__(self): if hasattr(self, '_cluster') and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) + super(ConsumerCoordinator, self).__del__() def protocol_type(self): return ConsumerProtocol.PROTOCOL_TYPE diff --git a/kafka/util.py b/kafka/util.py index 181f67f..75538dd 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -167,6 +167,14 @@ class WeakMethod(object): return self._target_id == other._target_id and self._method_id == other._method_id +class Dict(dict): + """Utility class to support passing weakrefs to dicts + + See: https://docs.python.org/2/library/weakref.html + """ + pass + + def try_method_on_system_exit(obj, method, *args, **kwargs): def wrapper(_obj, _meth, *args, **kwargs): try: |