diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-01-10 13:36:41 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-01-10 13:38:12 -0800 |
commit | 02a3034e95a06c9de00eb6add616b9138d9bcf24 (patch) | |
tree | e25e986965144b1dc5b8e6c12d58794178d0275a /kafka/coordinator/base.py | |
parent | 794b695e7ceff25834616bb54e32160104040df4 (diff) | |
download | kafka-python-reference_cleanup_close.tar.gz |
Improve KafkaConsumer cleanupreference_cleanup_close
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b16c1e1..30b9c40 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -356,10 +356,7 @@ class BaseCoordinator(object): self.rejoining = True if self._heartbeat_thread is None: - log.debug('Starting new heartbeat thread') - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() + self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() @@ -712,13 +709,30 @@ class BaseCoordinator(object): def request_rejoin(self): self.rejoin_needed = True + def _start_heartbeat_thread(self): + if self._heartbeat_thread is None: + log.info('Starting new heartbeat thread') + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + + def _close_heartbeat_thread(self): + if self._heartbeat_thread is not None: + log.info('Stopping heartbeat thread') + try: + self._heartbeat_thread.close() + except ReferenceError: + pass + self._heartbeat_thread = None + + def __del__(self): + self._close_heartbeat_thread() + def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" with self._lock: - if self._heartbeat_thread is not None: - self._heartbeat_thread.close() - self._heartbeat_thread = None + self._close_heartbeat_thread() self.maybe_leave_group() def maybe_leave_group(self): @@ -877,12 +891,11 @@ class HeartbeatThread(threading.Thread): self.coordinator._lock.notify() def disable(self): - with self.coordinator._lock: - self.enabled = False + self.enabled = False def close(self): + self.closed = True with self.coordinator._lock: - self.closed = True self.coordinator._lock.notify() def run(self): @@ -890,7 +903,10 @@ class HeartbeatThread(threading.Thread): while not self.closed: self._run_once() - log.debug('Heartbeat closed!') + log.debug('Heartbeat thread closed') + + except ReferenceError: + log.debug('Heartbeat thread closed due to coordinator gc') except RuntimeError as e: log.error("Heartbeat thread for group %s failed due to unexpected error: %s", |