From 02a3034e95a06c9de00eb6add616b9138d9bcf24 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jan 2018 13:36:41 -0800 Subject: Improve KafkaConsumer cleanup --- kafka/coordinator/base.py | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) (limited to 'kafka/coordinator/base.py') diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b16c1e1..30b9c40 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -356,10 +356,7 @@ class BaseCoordinator(object): self.rejoining = True if self._heartbeat_thread is None: - log.debug('Starting new heartbeat thread') - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() + self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() @@ -712,13 +709,30 @@ class BaseCoordinator(object): def request_rejoin(self): self.rejoin_needed = True + def _start_heartbeat_thread(self): + if self._heartbeat_thread is None: + log.info('Starting new heartbeat thread') + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + + def _close_heartbeat_thread(self): + if self._heartbeat_thread is not None: + log.info('Stopping heartbeat thread') + try: + self._heartbeat_thread.close() + except ReferenceError: + pass + self._heartbeat_thread = None + + def __del__(self): + self._close_heartbeat_thread() + def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" with self._lock: - if self._heartbeat_thread is not None: - self._heartbeat_thread.close() - self._heartbeat_thread = None + self._close_heartbeat_thread() self.maybe_leave_group() def maybe_leave_group(self): @@ -877,12 +891,11 @@ class HeartbeatThread(threading.Thread): self.coordinator._lock.notify() def disable(self): - with self.coordinator._lock: - self.enabled = False + self.enabled = False def close(self): + self.closed = True with self.coordinator._lock: - self.closed = True self.coordinator._lock.notify() def run(self): @@ -890,7 +903,10 @@ class HeartbeatThread(threading.Thread): while not self.closed: self._run_once() - log.debug('Heartbeat closed!') + log.debug('Heartbeat thread closed') + + except ReferenceError: + log.debug('Heartbeat thread closed due to coordinator gc') except RuntimeError as e: log.error("Heartbeat thread for group %s failed due to unexpected error: %s", -- cgit v1.2.1