summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-01-10 15:56:47 -0800
committerGitHub <noreply@github.com>2018-01-10 15:56:47 -0800
commit0a7492443c78d4791cfdf3d6384c02f1c7757c7b (patch)
treee25e986965144b1dc5b8e6c12d58794178d0275a
parent794b695e7ceff25834616bb54e32160104040df4 (diff)
downloadkafka-python-0a7492443c78d4791cfdf3d6384c02f1c7757c7b.tar.gz
Improve KafkaConsumer cleanup (#1339)
-rw-r--r--kafka/client_async.py25
-rw-r--r--kafka/conn.py12
-rw-r--r--kafka/coordinator/base.py38
-rw-r--r--kafka/coordinator/consumer.py1
-rw-r--r--kafka/util.py8
5 files changed, 62 insertions, 22 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 24162ad..0058cf3 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -6,6 +6,7 @@ import functools
import logging
import random
import threading
+import weakref
# selectors in stdlib as of py3.4
try:
@@ -27,6 +28,7 @@ from .metrics import AnonMeasurable
from .metrics.stats import Avg, Count, Rate
from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
+from .util import Dict, WeakMethod
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from .vendor import socketpair
@@ -197,7 +199,7 @@ class KafkaClient(object):
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
- self._conns = {}
+ self._conns = Dict() # object to support weakrefs
self._connecting = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
@@ -220,7 +222,7 @@ class KafkaClient(object):
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
- self._conns)
+ weakref.proxy(self._conns))
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
@@ -248,7 +250,7 @@ class KafkaClient(object):
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- cb = functools.partial(self._conn_state_change, 'bootstrap')
+ cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap')
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
node_id='bootstrap',
@@ -357,7 +359,7 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
host, port, afi = get_ip_port_afi(broker.host)
- cb = functools.partial(self._conn_state_change, node_id)
+ cb = functools.partial(WeakMethod(self._conn_state_change), node_id)
conn = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
node_id=node_id,
@@ -404,6 +406,13 @@ class KafkaClient(object):
return False
return self._conns[node_id].connected()
+ def _close(self):
+ if not self._closed:
+ self._closed = True
+ self._wake_r.close()
+ self._wake_w.close()
+ self._selector.close()
+
def close(self, node_id=None):
"""Close one or all broker connections.
@@ -412,18 +421,18 @@ class KafkaClient(object):
"""
with self._lock:
if node_id is None:
- self._closed = True
+ self._close()
for conn in self._conns.values():
conn.close()
- self._wake_r.close()
- self._wake_w.close()
- self._selector.close()
elif node_id in self._conns:
self._conns[node_id].close()
else:
log.warning("Node %s not found in current connection list; skipping", node_id)
return
+ def __del__(self):
+ self._close()
+
def is_disconnected(self, node_id):
"""Check whether the node connection has been disconnected or failed.
diff --git a/kafka/conn.py b/kafka/conn.py
index 23edf22..1e6770f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -628,6 +628,14 @@ class BrokerConnection(object):
self._reconnect_backoff /= 1000.0
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
+ def _close_socket(self):
+ if self._sock:
+ self._sock.close()
+ self._sock = None
+
+ def __del__(self):
+ self._close_socket()
+
def close(self, error=None):
"""Close socket and fail all in-flight-requests.
@@ -641,9 +649,7 @@ class BrokerConnection(object):
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
self._update_reconnect_backoff()
- if self._sock:
- self._sock.close()
- self._sock = None
+ self._close_socket()
self.state = ConnectionStates.DISCONNECTED
self._sasl_auth_future = None
self._protocol = KafkaProtocol(
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index b16c1e1..30b9c40 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -356,10 +356,7 @@ class BaseCoordinator(object):
self.rejoining = True
if self._heartbeat_thread is None:
- log.debug('Starting new heartbeat thread')
- self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
- self._heartbeat_thread.daemon = True
- self._heartbeat_thread.start()
+ self._start_heartbeat_thread()
while self.need_rejoin():
self.ensure_coordinator_ready()
@@ -712,13 +709,30 @@ class BaseCoordinator(object):
def request_rejoin(self):
self.rejoin_needed = True
+ def _start_heartbeat_thread(self):
+ if self._heartbeat_thread is None:
+ log.info('Starting new heartbeat thread')
+ self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
+ self._heartbeat_thread.daemon = True
+ self._heartbeat_thread.start()
+
+ def _close_heartbeat_thread(self):
+ if self._heartbeat_thread is not None:
+ log.info('Stopping heartbeat thread')
+ try:
+ self._heartbeat_thread.close()
+ except ReferenceError:
+ pass
+ self._heartbeat_thread = None
+
+ def __del__(self):
+ self._close_heartbeat_thread()
+
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
with self._lock:
- if self._heartbeat_thread is not None:
- self._heartbeat_thread.close()
- self._heartbeat_thread = None
+ self._close_heartbeat_thread()
self.maybe_leave_group()
def maybe_leave_group(self):
@@ -877,12 +891,11 @@ class HeartbeatThread(threading.Thread):
self.coordinator._lock.notify()
def disable(self):
- with self.coordinator._lock:
- self.enabled = False
+ self.enabled = False
def close(self):
+ self.closed = True
with self.coordinator._lock:
- self.closed = True
self.coordinator._lock.notify()
def run(self):
@@ -890,7 +903,10 @@ class HeartbeatThread(threading.Thread):
while not self.closed:
self._run_once()
- log.debug('Heartbeat closed!')
+ log.debug('Heartbeat thread closed')
+
+ except ReferenceError:
+ log.debug('Heartbeat thread closed due to coordinator gc')
except RuntimeError as e:
log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 48dcad4..ab30883 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -125,6 +125,7 @@ class ConsumerCoordinator(BaseCoordinator):
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
+ super(ConsumerCoordinator, self).__del__()
def protocol_type(self):
return ConsumerProtocol.PROTOCOL_TYPE
diff --git a/kafka/util.py b/kafka/util.py
index 181f67f..75538dd 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -167,6 +167,14 @@ class WeakMethod(object):
return self._target_id == other._target_id and self._method_id == other._method_id
+class Dict(dict):
+ """Utility class to support passing weakrefs to dicts
+
+ See: https://docs.python.org/2/library/weakref.html
+ """
+ pass
+
+
def try_method_on_system_exit(obj, method, *args, **kwargs):
def wrapper(_obj, _meth, *args, **kwargs):
try: