diff options
Diffstat (limited to 'kafka/coordinator')
-rw-r--r-- | kafka/coordinator/base.py | 38 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 1 |
2 files changed, 28 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b16c1e1..30b9c40 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -356,10 +356,7 @@ class BaseCoordinator(object): self.rejoining = True if self._heartbeat_thread is None: - log.debug('Starting new heartbeat thread') - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() + self._start_heartbeat_thread() while self.need_rejoin(): self.ensure_coordinator_ready() @@ -712,13 +709,30 @@ class BaseCoordinator(object): def request_rejoin(self): self.rejoin_needed = True + def _start_heartbeat_thread(self): + if self._heartbeat_thread is None: + log.info('Starting new heartbeat thread') + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + + def _close_heartbeat_thread(self): + if self._heartbeat_thread is not None: + log.info('Stopping heartbeat thread') + try: + self._heartbeat_thread.close() + except ReferenceError: + pass + self._heartbeat_thread = None + + def __del__(self): + self._close_heartbeat_thread() + def close(self): """Close the coordinator, leave the current group, and reset local generation / member_id""" with self._lock: - if self._heartbeat_thread is not None: - self._heartbeat_thread.close() - self._heartbeat_thread = None + self._close_heartbeat_thread() self.maybe_leave_group() def maybe_leave_group(self): @@ -877,12 +891,11 @@ class HeartbeatThread(threading.Thread): self.coordinator._lock.notify() def disable(self): - with self.coordinator._lock: - self.enabled = False + self.enabled = False def close(self): + self.closed = True with self.coordinator._lock: - self.closed = True self.coordinator._lock.notify() def run(self): @@ -890,7 +903,10 @@ class HeartbeatThread(threading.Thread): while not self.closed: self._run_once() - log.debug('Heartbeat closed!') + log.debug('Heartbeat thread closed') + + except ReferenceError: + log.debug('Heartbeat thread closed due to coordinator gc') except RuntimeError as e: log.error("Heartbeat thread for group %s failed due to unexpected error: %s", diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 48dcad4..ab30883 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -125,6 +125,7 @@ class ConsumerCoordinator(BaseCoordinator): def __del__(self): if hasattr(self, '_cluster') and self._cluster: self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) + super(ConsumerCoordinator, self).__del__() def protocol_type(self): return ConsumerProtocol.PROTOCOL_TYPE |