summaryrefslogtreecommitdiff
path: root/kafka/coordinator
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator')
-rw-r--r--kafka/coordinator/base.py38
-rw-r--r--kafka/coordinator/consumer.py1
2 files changed, 28 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index b16c1e1..30b9c40 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -356,10 +356,7 @@ class BaseCoordinator(object):
self.rejoining = True
if self._heartbeat_thread is None:
- log.debug('Starting new heartbeat thread')
- self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
- self._heartbeat_thread.daemon = True
- self._heartbeat_thread.start()
+ self._start_heartbeat_thread()
while self.need_rejoin():
self.ensure_coordinator_ready()
@@ -712,13 +709,30 @@ class BaseCoordinator(object):
def request_rejoin(self):
self.rejoin_needed = True
+ def _start_heartbeat_thread(self):
+ if self._heartbeat_thread is None:
+ log.info('Starting new heartbeat thread')
+ self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
+ self._heartbeat_thread.daemon = True
+ self._heartbeat_thread.start()
+
+ def _close_heartbeat_thread(self):
+ if self._heartbeat_thread is not None:
+ log.info('Stopping heartbeat thread')
+ try:
+ self._heartbeat_thread.close()
+ except ReferenceError:
+ pass
+ self._heartbeat_thread = None
+
+ def __del__(self):
+ self._close_heartbeat_thread()
+
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
with self._lock:
- if self._heartbeat_thread is not None:
- self._heartbeat_thread.close()
- self._heartbeat_thread = None
+ self._close_heartbeat_thread()
self.maybe_leave_group()
def maybe_leave_group(self):
@@ -877,12 +891,11 @@ class HeartbeatThread(threading.Thread):
self.coordinator._lock.notify()
def disable(self):
- with self.coordinator._lock:
- self.enabled = False
+ self.enabled = False
def close(self):
+ self.closed = True
with self.coordinator._lock:
- self.closed = True
self.coordinator._lock.notify()
def run(self):
@@ -890,7 +903,10 @@ class HeartbeatThread(threading.Thread):
while not self.closed:
self._run_once()
- log.debug('Heartbeat closed!')
+ log.debug('Heartbeat thread closed')
+
+ except ReferenceError:
+ log.debug('Heartbeat thread closed due to coordinator gc')
except RuntimeError as e:
log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 48dcad4..ab30883 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -125,6 +125,7 @@ class ConsumerCoordinator(BaseCoordinator):
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
+ super(ConsumerCoordinator, self).__del__()
def protocol_type(self):
return ConsumerProtocol.PROTOCOL_TYPE