summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-03 21:32:21 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-03 21:34:40 +0300
commitefc7b924734ea0f393d0daf16b000a79f690dfd2 (patch)
tree602a5cc040212138f326dad6a5138d23755582c8
parent87d6efbda4c1b5c5a0d502bdd37bf72189054892 (diff)
downloadapscheduler-efc7b924734ea0f393d0daf16b000a79f690dfd2.tar.gz
Don't report cancellations as crashes on Python 3.7
-rw-r--r--docs/versionhistory.rst1
-rw-r--r--src/apscheduler/eventbrokers/async_redis.py11
-rw-r--r--src/apscheduler/schedulers/async_.py9
-rw-r--r--src/apscheduler/workers/async_.py16
-rw-r--r--src/apscheduler/workers/sync.py20
5 files changed, 41 insertions, 16 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 7f997f1..77919ca 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -13,6 +13,7 @@ APScheduler, see the :doc:`migration section <migration>`.
- Changed ``from_async_sqla_engine()`` in asyncpg event broker to only copy the
connection options instead of directly using the engine
- Simplified the MQTT event broker by providing a default ``client`` instance if omitted
+- Fixed ``CancelledError`` being reported as a crash on Python 3.7
**4.0.0a1**
diff --git a/src/apscheduler/eventbrokers/async_redis.py b/src/apscheduler/eventbrokers/async_redis.py
index faa5438..00d526f 100644
--- a/src/apscheduler/eventbrokers/async_redis.py
+++ b/src/apscheduler/eventbrokers/async_redis.py
@@ -1,5 +1,7 @@
from __future__ import annotations
+from asyncio import CancelledError
+
import anyio
import attrs
import tenacity
@@ -105,8 +107,13 @@ class AsyncRedisEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin):
event = self.reconstitute_event(msg["data"])
if event is not None:
await self.publish_local(event)
- except Exception:
- self._logger.exception(f"{self.__class__.__name__} listener crashed")
+ except Exception as exc:
+ # CancelledError is a subclass of Exception in Python 3.7
+ if not isinstance(exc, CancelledError):
+ self._logger.exception(
+ f"{self.__class__.__name__} listener crashed"
+ )
+
await pubsub.close()
raise
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 48afe88..2851f6e 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -3,6 +3,7 @@ from __future__ import annotations
import os
import platform
import random
+from asyncio import CancelledError
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
@@ -500,14 +501,16 @@ class AsyncScheduler:
raise
finally:
self._state = RunState.stopped
- if isinstance(exception, Exception):
+
+ # CancelledError is a subclass of Exception in Python 3.7
+ if not exception or isinstance(exception, CancelledError):
+ self.logger.info("Scheduler stopped")
+ elif isinstance(exception, Exception):
self.logger.exception("Scheduler crashed")
elif exception:
self.logger.info(
f"Scheduler stopped due to {exception.__class__.__name__}"
)
- else:
- self.logger.info("Scheduler stopped")
with move_on_after(3, shield=True):
await self.event_broker.publish_local(
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index ec374cb..4ef1bff 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -2,6 +2,7 @@ from __future__ import annotations
import os
import platform
+from asyncio import CancelledError
from contextlib import AsyncExitStack
from datetime import datetime, timezone
from inspect import isawaitable
@@ -136,12 +137,21 @@ class AsyncWorker:
except get_cancelled_exc_class():
pass
except BaseException as exc:
- self.logger.exception("Worker crashed")
exception = exc
- else:
- self.logger.info("Worker stopped")
+ raise
finally:
self._state = RunState.stopped
+
+ # CancelledError is a subclass of Exception in Python 3.7
+ if not exception or isinstance(exception, CancelledError):
+ self.logger.info("Worker stopped")
+ elif isinstance(exception, Exception):
+ self.logger.exception("Worker crashed")
+ elif exception:
+ self.logger.info(
+ f"Worker stopped due to {exception.__class__.__name__}"
+ )
+
with move_on_after(3, shield=True):
await self.event_broker.publish_local(
WorkerStopped(exception=exception)
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 26e4df6..41279d9 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -167,6 +167,7 @@ class Worker:
if start_future:
start_future.set_result(None)
+ exception: BaseException | None = None
try:
while self._state is RunState.started:
available_slots = self.max_concurrent_jobs - len(self._running_jobs)
@@ -184,17 +185,20 @@ class Worker:
self._wakeup_event.wait()
self._wakeup_event = threading.Event()
except BaseException as exc:
+ exception = exc
+ raise
+ finally:
self._state = RunState.stopped
- if isinstance(exc, Exception):
+ if not exception:
+ self.logger.info("Worker stopped")
+ elif isinstance(exception, Exception):
self.logger.exception("Worker crashed")
- else:
- self.logger.info(f"Worker stopped due to {exc.__class__.__name__}")
+ elif exception:
+ self.logger.info(
+ f"Worker stopped due to {exception.__class__.__name__}"
+ )
- self.event_broker.publish_local(WorkerStopped(exception=exc))
- else:
- self._state = RunState.stopped
- self.logger.info("Worker stopped")
- self.event_broker.publish_local(WorkerStopped())
+ self.event_broker.publish_local(WorkerStopped(exception=exception))
def _run_job(self, job: Job, func: Callable) -> None:
try: