diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-10-02 13:46:39 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-10-02 14:06:26 +0300 |
commit | 921a95d528de6fc7439b419565774ffb4ea9c00c (patch) | |
tree | 64e233b2e927d7b01b2584db6567e0005486244b | |
parent | 1647d3d59e958ed5f86ef75382ee8e683ca4b34c (diff) | |
download | apscheduler-921a95d528de6fc7439b419565774ffb4ea9c00c.tar.gz |
Improved scheduler and worker crash handling
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 13 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 2 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 18 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 17 |
4 files changed, 29 insertions, 21 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index b8f2e0b..a259678 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -234,6 +234,7 @@ class AsyncScheduler: task_status.started() await self._events.publish(SchedulerStarted()) + exception: Optional[BaseException] = None try: while self._state is RunState.started: schedules = await self.data_store.acquire_schedules(self.identity, 100) @@ -314,9 +315,11 @@ class AsyncScheduler: except get_cancelled_exc_class(): pass except BaseException as exc: + self.logger.exception('Scheduler crashed') + exception = exc + else: + self.logger.info('Scheduler stopped') + finally: self._state = RunState.stopped - await self._events.publish(SchedulerStopped(exception=exc)) - raise - - self._state = RunState.stopped - await self._events.publish(SchedulerStopped()) + with move_on_after(3, shield=True): + await self._events.publish(SchedulerStopped(exception=exception)) diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 748c8aa..cc77d45 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -323,10 +323,12 @@ class Scheduler: self._wakeup_event = threading.Event() except BaseException as exc: self._state = RunState.stopped + self.logger.exception('Scheduler crashed') self._events.publish(SchedulerStopped(exception=exc)) raise self._state = RunState.stopped + self.logger.info('Scheduler stopped') self._events.publish(SchedulerStopped()) # def stop(self) -> None: diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index 723b004..e59f0d9 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -92,6 +92,7 @@ class AsyncWorker: task_status.started() await self._events.publish(WorkerStarted()) + exception: Optional[BaseException] = None try: async with create_task_group() as tg: while self._state is RunState.started: @@ -107,15 +108,16 @@ class AsyncWorker: except get_cancelled_exc_class(): pass except BaseException as exc: + self.logger.exception('Worker crashed') + exception = exc + else: + self.logger.info('Worker stopped') + finally: + current_worker.reset(token) self._state = RunState.stopped - with move_on_after(1, shield=True): - await self._events.publish(WorkerStopped(exception=exc)) - - raise - - current_worker.reset(token) - self._state = RunState.stopped - await self._events.publish(WorkerStopped()) + self.logger.exception('Worker crashed') + with move_on_after(3, shield=True): + await self._events.publish(WorkerStopped(exception=exception)) async def _run_job(self, job: Job, func: Callable) -> None: try: diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 4e3eef4..6bac3ca 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -98,6 +98,7 @@ class Worker: self._events.publish(WorkerStarted()) executor = ThreadPoolExecutor(max_workers=self.max_concurrent_jobs) + exception: Optional[BaseException] = None try: while self._state is RunState.started: available_slots = self.max_concurrent_jobs - len(self._running_jobs) @@ -111,15 +112,15 @@ class Worker: self._wakeup_event.wait() self._wakeup_event = threading.Event() except BaseException as exc: - executor.shutdown(wait=False) + self.logger.exception('Worker crashed') + exception = exc + else: + self.logger.info('Worker stopped') + finally: + current_worker.reset(token) self._state = RunState.stopped - self._events.publish(WorkerStopped(exception=exc)) - raise - - executor.shutdown() - current_worker.reset(token) - self._state = RunState.stopped - self._events.publish(WorkerStopped()) + self._events.publish(WorkerStopped(exception=exception)) + executor.shutdown() def _run_job(self, job: Job, func: Callable) -> None: try: |