summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-10-02 13:46:39 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-10-02 14:06:26 +0300
commit921a95d528de6fc7439b419565774ffb4ea9c00c (patch)
tree64e233b2e927d7b01b2584db6567e0005486244b
parent1647d3d59e958ed5f86ef75382ee8e683ca4b34c (diff)
downloadapscheduler-921a95d528de6fc7439b419565774ffb4ea9c00c.tar.gz
Improved scheduler and worker crash handling
-rw-r--r--src/apscheduler/schedulers/async_.py13
-rw-r--r--src/apscheduler/schedulers/sync.py2
-rw-r--r--src/apscheduler/workers/async_.py18
-rw-r--r--src/apscheduler/workers/sync.py17
4 files changed, 29 insertions, 21 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index b8f2e0b..a259678 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -234,6 +234,7 @@ class AsyncScheduler:
task_status.started()
await self._events.publish(SchedulerStarted())
+ exception: Optional[BaseException] = None
try:
while self._state is RunState.started:
schedules = await self.data_store.acquire_schedules(self.identity, 100)
@@ -314,9 +315,11 @@ class AsyncScheduler:
except get_cancelled_exc_class():
pass
except BaseException as exc:
+ self.logger.exception('Scheduler crashed')
+ exception = exc
+ else:
+ self.logger.info('Scheduler stopped')
+ finally:
self._state = RunState.stopped
- await self._events.publish(SchedulerStopped(exception=exc))
- raise
-
- self._state = RunState.stopped
- await self._events.publish(SchedulerStopped())
+ with move_on_after(3, shield=True):
+ await self._events.publish(SchedulerStopped(exception=exception))
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 748c8aa..cc77d45 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -323,10 +323,12 @@ class Scheduler:
self._wakeup_event = threading.Event()
except BaseException as exc:
self._state = RunState.stopped
+ self.logger.exception('Scheduler crashed')
self._events.publish(SchedulerStopped(exception=exc))
raise
self._state = RunState.stopped
+ self.logger.info('Scheduler stopped')
self._events.publish(SchedulerStopped())
# def stop(self) -> None:
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index 723b004..e59f0d9 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -92,6 +92,7 @@ class AsyncWorker:
task_status.started()
await self._events.publish(WorkerStarted())
+ exception: Optional[BaseException] = None
try:
async with create_task_group() as tg:
while self._state is RunState.started:
@@ -107,15 +108,16 @@ class AsyncWorker:
except get_cancelled_exc_class():
pass
except BaseException as exc:
+ self.logger.exception('Worker crashed')
+ exception = exc
+ else:
+ self.logger.info('Worker stopped')
+ finally:
+ current_worker.reset(token)
self._state = RunState.stopped
- with move_on_after(1, shield=True):
- await self._events.publish(WorkerStopped(exception=exc))
-
- raise
-
- current_worker.reset(token)
- self._state = RunState.stopped
- await self._events.publish(WorkerStopped())
+ self.logger.exception('Worker crashed')
+ with move_on_after(3, shield=True):
+ await self._events.publish(WorkerStopped(exception=exception))
async def _run_job(self, job: Job, func: Callable) -> None:
try:
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 4e3eef4..6bac3ca 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -98,6 +98,7 @@ class Worker:
self._events.publish(WorkerStarted())
executor = ThreadPoolExecutor(max_workers=self.max_concurrent_jobs)
+ exception: Optional[BaseException] = None
try:
while self._state is RunState.started:
available_slots = self.max_concurrent_jobs - len(self._running_jobs)
@@ -111,15 +112,15 @@ class Worker:
self._wakeup_event.wait()
self._wakeup_event = threading.Event()
except BaseException as exc:
- executor.shutdown(wait=False)
+ self.logger.exception('Worker crashed')
+ exception = exc
+ else:
+ self.logger.info('Worker stopped')
+ finally:
+ current_worker.reset(token)
self._state = RunState.stopped
- self._events.publish(WorkerStopped(exception=exc))
- raise
-
- executor.shutdown()
- current_worker.reset(token)
- self._state = RunState.stopped
- self._events.publish(WorkerStopped())
+ self._events.publish(WorkerStopped(exception=exception))
+ executor.shutdown()
def _run_job(self, job: Job, func: Callable) -> None:
try: