diff options
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b16c1e1..30b9c40 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -356,10 +356,7 @@ class BaseCoordinator(object): self.rejoining = True if self._heartbeat_thread is None: - log.debug('Starting new heartbeat thread') - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() + self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() @@ -712,13 +709,30 @@ class BaseCoordinator(object): def request_rejoin(self): self.rejoin_needed = True + def _start_heartbeat_thread(self): + if self._heartbeat_thread is None: + log.info('Starting new heartbeat thread') + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + + def _close_heartbeat_thread(self): + if self._heartbeat_thread is not None: + log.info('Stopping heartbeat thread') + try: + self._heartbeat_thread.close() + except ReferenceError: + pass + self._heartbeat_thread = None + + def __del__(self): + self._close_heartbeat_thread() + def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" with self._lock: - if self._heartbeat_thread is not None: - self._heartbeat_thread.close() - self._heartbeat_thread = None + self._close_heartbeat_thread() self.maybe_leave_group() def maybe_leave_group(self): @@ -877,12 +891,11 @@ class HeartbeatThread(threading.Thread): self.coordinator._lock.notify() def disable(self): - with self.coordinator._lock: - self.enabled = False + self.enabled = False def close(self): + self.closed = True with self.coordinator._lock: - self.closed = True self.coordinator._lock.notify() def run(self): @@ -890,7 +903,10 @@ class HeartbeatThread(threading.Thread): while not self.closed: self._run_once() - log.debug('Heartbeat closed!') + log.debug('Heartbeat thread closed') + + except ReferenceError: + log.debug('Heartbeat thread closed due to coordinator gc') except RuntimeError as e: log.error("Heartbeat thread for group %s failed due to unexpected error: %s", |