diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-03 21:32:21 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-03 21:34:40 +0300 |
commit | efc7b924734ea0f393d0daf16b000a79f690dfd2 (patch) | |
tree | 602a5cc040212138f326dad6a5138d23755582c8 | |
parent | 87d6efbda4c1b5c5a0d502bdd37bf72189054892 (diff) | |
download | apscheduler-efc7b924734ea0f393d0daf16b000a79f690dfd2.tar.gz |
Don't report cancellations as crashes on Python 3.7
-rw-r--r-- | docs/versionhistory.rst | 1 | ||||
-rw-r--r-- | src/apscheduler/eventbrokers/async_redis.py | 11 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 9 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 16 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 20 |
5 files changed, 41 insertions, 16 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 7f997f1..77919ca 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -13,6 +13,7 @@ APScheduler, see the :doc:`migration section <migration>`. - Changed ``from_async_sqla_engine()`` in asyncpg event broker to only copy the connection options instead of directly using the engine - Simplified the MQTT event broker by providing a default ``client`` instance if omitted +- Fixed ``CancelledError`` being reported as a crash on Python 3.7 **4.0.0a1** diff --git a/src/apscheduler/eventbrokers/async_redis.py b/src/apscheduler/eventbrokers/async_redis.py index faa5438..00d526f 100644 --- a/src/apscheduler/eventbrokers/async_redis.py +++ b/src/apscheduler/eventbrokers/async_redis.py @@ -1,5 +1,7 @@ from __future__ import annotations +from asyncio import CancelledError + import anyio import attrs import tenacity @@ -105,8 +107,13 @@ class AsyncRedisEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): event = self.reconstitute_event(msg["data"]) if event is not None: await self.publish_local(event) - except Exception: - self._logger.exception(f"{self.__class__.__name__} listener crashed") + except Exception as exc: + # CancelledError is a subclass of Exception in Python 3.7 + if not isinstance(exc, CancelledError): + self._logger.exception( + f"{self.__class__.__name__} listener crashed" + ) + await pubsub.close() raise diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 48afe88..2851f6e 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -3,6 +3,7 @@ from __future__ import annotations import os import platform import random +from asyncio import CancelledError from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger @@ -500,14 +501,16 @@ class AsyncScheduler: raise finally: self._state = RunState.stopped - if isinstance(exception, Exception): + + # CancelledError is a subclass of Exception in Python 3.7 + if not exception or isinstance(exception, CancelledError): + self.logger.info("Scheduler stopped") + elif isinstance(exception, Exception): self.logger.exception("Scheduler crashed") elif exception: self.logger.info( f"Scheduler stopped due to {exception.__class__.__name__}" ) - else: - self.logger.info("Scheduler stopped") with move_on_after(3, shield=True): await self.event_broker.publish_local( diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index ec374cb..4ef1bff 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -2,6 +2,7 @@ from __future__ import annotations import os import platform +from asyncio import CancelledError from contextlib import AsyncExitStack from datetime import datetime, timezone from inspect import isawaitable @@ -136,12 +137,21 @@ class AsyncWorker: except get_cancelled_exc_class(): pass except BaseException as exc: - self.logger.exception("Worker crashed") exception = exc - else: - self.logger.info("Worker stopped") + raise finally: self._state = RunState.stopped + + # CancelledError is a subclass of Exception in Python 3.7 + if not exception or isinstance(exception, CancelledError): + self.logger.info("Worker stopped") + elif isinstance(exception, Exception): + self.logger.exception("Worker crashed") + elif exception: + self.logger.info( + f"Worker stopped due to {exception.__class__.__name__}" + ) + with move_on_after(3, shield=True): await self.event_broker.publish_local( WorkerStopped(exception=exception) diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 26e4df6..41279d9 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -167,6 +167,7 @@ class Worker: if start_future: start_future.set_result(None) + exception: BaseException | None = None try: while self._state is RunState.started: available_slots = self.max_concurrent_jobs - len(self._running_jobs) @@ -184,17 +185,20 @@ class Worker: self._wakeup_event.wait() self._wakeup_event = threading.Event() except BaseException as exc: + exception = exc + raise + finally: self._state = RunState.stopped - if isinstance(exc, Exception): + if not exception: + self.logger.info("Worker stopped") + elif isinstance(exception, Exception): self.logger.exception("Worker crashed") - else: - self.logger.info(f"Worker stopped due to {exc.__class__.__name__}") + elif exception: + self.logger.info( + f"Worker stopped due to {exception.__class__.__name__}" + ) - self.event_broker.publish_local(WorkerStopped(exception=exc)) - else: - self._state = RunState.stopped - self.logger.info("Worker stopped") - self.event_broker.publish_local(WorkerStopped()) + self.event_broker.publish_local(WorkerStopped(exception=exception)) def _run_job(self, job: Job, func: Callable) -> None: try: |