diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-04 01:29:43 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-04 01:29:43 +0300 |
commit | 5f0627be932d561c92bedc684ab1b8c5646520d4 (patch) | |
tree | 078ae657f60dd1bbcdbbdf300d1c7942e958e64b | |
parent | 90a9675f4444809c72c25a450a40243be8110b68 (diff) | |
download | apscheduler-5f0627be932d561c92bedc684ab1b8c5646520d4.tar.gz |
Changed the scheduler API to require an explicit start call
-rw-r--r-- | docs/versionhistory.rst | 3 | ||||
-rw-r--r-- | examples/separate_worker/async_scheduler.py | 2 | ||||
-rw-r--r-- | examples/separate_worker/sync_scheduler.py | 2 | ||||
-rw-r--r-- | examples/standalone/async_memory.py | 2 | ||||
-rw-r--r-- | examples/standalone/async_mysql.py | 2 | ||||
-rw-r--r-- | examples/standalone/async_postgres.py | 2 | ||||
-rw-r--r-- | examples/standalone/sync_memory.py | 2 | ||||
-rw-r--r-- | examples/web/asgi_fastapi.py | 1 | ||||
-rw-r--r-- | examples/web/asgi_noframework.py | 1 | ||||
-rw-r--r-- | examples/web/asgi_starlette.py | 1 | ||||
-rw-r--r-- | pyproject.toml | 1 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 73 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 35 | ||||
-rw-r--r-- | tests/test_schedulers.py | 44 |
14 files changed, 131 insertions, 40 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index e847918..103daf6 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -6,6 +6,9 @@ APScheduler, see the :doc:`migration section <migration>`. **UNRELEASED** +- **BREAKING** Changed the scheduler API to always require a call to either + ``run_until_stopped()`` or ``start_in_background()`` to start the scheduler (using it + as a context manager is no longer enough) - Added an async Redis event broker - Added automatic reconnection to the Redis event brokers (sync and async) - Added automatic reconnection to the asyncpg event broker diff --git a/examples/separate_worker/async_scheduler.py b/examples/separate_worker/async_scheduler.py index 27eb37a..6ffdbcd 100644 --- a/examples/separate_worker/async_scheduler.py +++ b/examples/separate_worker/async_scheduler.py @@ -40,7 +40,7 @@ async def main(): data_store, event_broker, start_worker=False ) as scheduler: await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") - await scheduler.wait_until_stopped() + await scheduler.run_until_stopped() logging.basicConfig(level=logging.INFO) diff --git a/examples/separate_worker/sync_scheduler.py b/examples/separate_worker/sync_scheduler.py index 9c40032..83d2d06 100644 --- a/examples/separate_worker/sync_scheduler.py +++ b/examples/separate_worker/sync_scheduler.py @@ -34,4 +34,4 @@ event_broker = RedisEventBroker.from_url("redis://localhost") with Scheduler(data_store, event_broker, start_worker=False) as scheduler: scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") - scheduler.wait_until_stopped() + scheduler.run_until_stopped() diff --git a/examples/standalone/async_memory.py b/examples/standalone/async_memory.py index b3db775..8fd4e2d 100644 --- a/examples/standalone/async_memory.py +++ b/examples/standalone/async_memory.py @@ -22,7 +22,7 @@ def tick(): async def main(): async with AsyncScheduler() as scheduler: await scheduler.add_schedule(tick, IntervalTrigger(seconds=1)) - await scheduler.wait_until_stopped() + await scheduler.run_until_stopped() run(main()) diff --git a/examples/standalone/async_mysql.py b/examples/standalone/async_mysql.py index 3f6f95f..6e8d801 100644 --- a/examples/standalone/async_mysql.py +++ b/examples/standalone/async_mysql.py @@ -32,7 +32,7 @@ async def main(): data_store = AsyncSQLAlchemyDataStore(engine) async with AsyncScheduler(data_store) as scheduler: await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") - await scheduler.wait_until_stopped() + await scheduler.run_until_stopped() run(main()) diff --git a/examples/standalone/async_postgres.py b/examples/standalone/async_postgres.py index e6083f7..e9c519a 100644 --- a/examples/standalone/async_postgres.py +++ b/examples/standalone/async_postgres.py @@ -32,7 +32,7 @@ async def main(): data_store = AsyncSQLAlchemyDataStore(engine) async with AsyncScheduler(data_store) as scheduler: await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") - await scheduler.wait_until_stopped() + await scheduler.run_until_stopped() run(main()) diff --git a/examples/standalone/sync_memory.py b/examples/standalone/sync_memory.py index 6d804f7..13b22a7 100644 --- a/examples/standalone/sync_memory.py +++ b/examples/standalone/sync_memory.py @@ -20,4 +20,4 @@ def tick(): with Scheduler() as scheduler: scheduler.add_schedule(tick, IntervalTrigger(seconds=1)) - scheduler.wait_until_stopped() + scheduler.run_until_stopped() diff --git a/examples/web/asgi_fastapi.py b/examples/web/asgi_fastapi.py index 2cfa5c4..ffa79cb 100644 --- a/examples/web/asgi_fastapi.py +++ b/examples/web/asgi_fastapi.py @@ -45,6 +45,7 @@ class SchedulerMiddleware: await self.scheduler.add_schedule( tick, IntervalTrigger(seconds=1), id="tick" ) + await self.scheduler.start_in_background() await self.app(scope, receive, send) else: await self.app(scope, receive, send) diff --git a/examples/web/asgi_noframework.py b/examples/web/asgi_noframework.py index c3ac184..4b57c30 100644 --- a/examples/web/asgi_noframework.py +++ b/examples/web/asgi_noframework.py @@ -64,6 +64,7 @@ async def scheduler_middleware(scope, receive, send): event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine) async with AsyncScheduler(data_store, event_broker) as scheduler: await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick") + await scheduler.start_in_background() await original_app(scope, receive, send) else: await original_app(scope, receive, send) diff --git a/examples/web/asgi_starlette.py b/examples/web/asgi_starlette.py index edc070c..4288bfb 100644 --- a/examples/web/asgi_starlette.py +++ b/examples/web/asgi_starlette.py @@ -46,6 +46,7 @@ class SchedulerMiddleware: await self.scheduler.add_schedule( tick, IntervalTrigger(seconds=1), id="tick" ) + await self.scheduler.start_in_background() await self.app(scope, receive, send) else: await self.app(scope, receive, send) diff --git a/pyproject.toml b/pyproject.toml index e9eb5fb..b400d39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "attrs >= 21.3", "tenacity ~= 8.0", "tzlocal >= 3.0", + "typing_extensions; python_version < '3.11'" ] dynamic = ["version"] diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 2851f6e..75972b5 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -3,10 +3,12 @@ from __future__ import annotations import os import platform import random +import sys from asyncio import CancelledError from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger +from types import TracebackType from typing import Any, Callable, Iterable, Mapping, cast from uuid import UUID, uuid4 @@ -39,6 +41,11 @@ from ..eventbrokers.async_local import LocalAsyncEventBroker from ..marshalling import callable_to_ref from ..workers.async_ import AsyncWorker +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + _microsecond_delta = timedelta(microseconds=1) _zero_timedelta = timedelta() @@ -59,6 +66,8 @@ class AsyncScheduler: _state: RunState = attrs.field(init=False, default=RunState.stopped) _task_group: TaskGroup | None = attrs.field(init=False, default=None) + _exit_stack: AsyncExitStack | None = attrs.field(init=False, default=None) + _services_initialized: bool = attrs.field(init=False, default=False) _wakeup_event: anyio.Event = attrs.field(init=False) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) _schedule_added_subscription: Subscription = attrs.field(init=False) @@ -67,17 +76,43 @@ class AsyncScheduler: if not self.identity: self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" - async def __aenter__(self): - self._task_group = create_task_group() - await self._task_group.__aenter__() - await self._task_group.start(self._run) + async def __aenter__(self: Self) -> Self: + self._exit_stack = AsyncExitStack() + await self._exit_stack.__aenter__() + await self._ensure_services_ready(self._exit_stack) + self._task_group = await self._exit_stack.enter_async_context( + create_task_group() + ) return self - async def __aexit__(self, exc_type, exc_val, exc_tb): + async def __aexit__( + self, + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: await self.stop() - await self._task_group.__aexit__(exc_type, exc_val, exc_tb) + await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) self._task_group = None + async def _ensure_services_ready(self, exit_stack: AsyncExitStack) -> None: + """Ensure that the data store and event broker have been initialized.""" + if not self._services_initialized: + self._services_initialized = True + exit_stack.callback(setattr, self, "_services_initialized", False) + + # Initialize the event broker + await self.event_broker.start() + exit_stack.push_async_exit( + lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None) + ) + + # Initialize the data store + await self.data_store.start(self.event_broker) + exit_stack.push_async_exit( + lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None) + ) + def _schedule_added_or_modified(self, event: Event) -> None: event_ = cast("ScheduleAdded | ScheduleUpdated", event) if not self._wakeup_deadline or ( @@ -342,7 +377,18 @@ class AsyncScheduler: ): await event.wait() - async def _run(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: + async def start_in_background(self) -> None: + if self._task_group is None: + raise RuntimeError( + "The scheduler must be used as an async context manager (async with " + "...) in order to be startable in the background" + ) + + await self._task_group.start(self.run_until_stopped) + + async def run_until_stopped( + self, *, task_status: TaskStatus = TASK_STATUS_IGNORED + ) -> None: if self._state is not RunState.stopped: raise RuntimeError( f'Cannot start the scheduler when it is in the "{self._state}" ' @@ -352,18 +398,7 @@ class AsyncScheduler: self._state = RunState.starting async with AsyncExitStack() as exit_stack: self._wakeup_event = anyio.Event() - - # Initialize the event broker - await self.event_broker.start() - exit_stack.push_async_exit( - lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None) - ) - - # Initialize the data store - await self.data_store.start(self.event_broker) - exit_stack.push_async_exit( - lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None) - ) + await self._ensure_services_ready(exit_stack) # Wake up the scheduler if the data store emits a significant schedule event exit_stack.enter_context( diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index c821a6a..d98161a 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -4,6 +4,7 @@ import atexit import os import platform import random +import sys import threading from concurrent.futures import Future from contextlib import ExitStack @@ -38,6 +39,11 @@ from ..eventbrokers.local import LocalEventBroker from ..marshalling import callable_to_ref from ..workers.sync import Worker +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + _microsecond_delta = timedelta(microseconds=1) _zero_timedelta = timedelta() @@ -57,15 +63,16 @@ class Scheduler: _wakeup_event: threading.Event = attrs.field(init=False, factory=threading.Event) _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) _services_initialized: bool = attrs.field(init=False, default=False) - _exit_stack: ExitStack = attrs.field(init=False, factory=ExitStack) + _exit_stack: ExitStack | None = attrs.field(init=False, default=None) _lock: threading.RLock = attrs.field(init=False, factory=threading.RLock) def __attrs_post_init__(self) -> None: if not self.identity: self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}" - def __enter__(self) -> Scheduler: - self.start_in_background() + def __enter__(self: Self) -> Self: + self._exit_stack = ExitStack() + self._ensure_services_ready(self._exit_stack) return self def __exit__( @@ -75,16 +82,24 @@ class Scheduler: exc_tb: TracebackType, ) -> None: self.stop() - self._join_thread() + self._exit_stack.__exit__(exc_type, exc_val, exc_tb) def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None: """Ensure that the data store and event broker have been initialized.""" - stack = exit_stack or self._exit_stack with self._lock: if not self._services_initialized: + if exit_stack is None: + if self._exit_stack is None: + self._exit_stack = exit_stack = ExitStack() + atexit.register(self._exit_stack.close) + else: + exit_stack = self._exit_stack + self._services_initialized = True + exit_stack.callback(setattr, self, "_services_initialized", False) + self.event_broker.start() - stack.push( + exit_stack.push( lambda *exc_info: self.event_broker.stop( force=exc_info[0] is not None ) @@ -92,13 +107,11 @@ class Scheduler: # Initialize the data store self.data_store.start(self.event_broker) - stack.push( + exit_stack.push( lambda *exc_info: self.data_store.stop( force=exc_info[0] is not None ) ) - if not exit_stack: - atexit.register(self._exit_stack.close) def _schedule_added_or_modified(self, event: Event) -> None: event_ = cast("ScheduleAdded | ScheduleUpdated", event) @@ -377,8 +390,8 @@ class Scheduler: self._thread = None raise - atexit.register(self._join_thread) - atexit.register(self.stop) + self._exit_stack.callback(self._join_thread) + self._exit_stack.callback(self.stop) def stop(self) -> None: """ diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index dea5e83..32c1c29 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -22,6 +22,7 @@ from apscheduler import ( ScheduleAdded, ScheduleLookupError, ScheduleRemoved, + SchedulerStarted, SchedulerStopped, Task, TaskAdded, @@ -72,6 +73,7 @@ class TestAsyncScheduler: async with AsyncScheduler(start_worker=False) as scheduler: scheduler.event_broker.subscribe(listener) await scheduler.add_schedule(dummy_async_job, trigger, id="foo") + await scheduler.start_in_background() with fail_after(3): await event.wait() @@ -86,6 +88,10 @@ class TestAsyncScheduler: assert received_event.schedule_id == "foo" # assert received_event.task_id == 'task_id' + # Then the scheduler was started + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStarted) + # Then that schedule was processed and a job was added for it received_event = received_events.pop(0) assert isinstance(received_event, JobAdded) @@ -166,6 +172,7 @@ class TestAsyncScheduler: assert schedule.max_jitter == timedelta(seconds=max_jitter) # Wait for the job to be added + await scheduler.start_in_background() with fail_after(3): await job_added_event.wait() @@ -185,7 +192,10 @@ class TestAsyncScheduler: job_id = await scheduler.add_job( dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5 ) - result = await scheduler.get_job_result(job_id) + await scheduler.start_in_background() + with fail_after(3): + result = await scheduler.get_job_result(job_id) + assert result.job_id == job_id assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" @@ -197,6 +207,7 @@ class TestAsyncScheduler: lambda evt: event.set(), {JobReleased}, one_shot=True ) job_id = await scheduler.add_job(dummy_async_job) + await scheduler.start_in_background() with fail_after(3): await event.wait() @@ -210,7 +221,10 @@ class TestAsyncScheduler: kwargs={"delay": 0.2, "fail": True}, result_expiration_time=5, ) - result = await scheduler.get_job_result(job_id) + await scheduler.start_in_background() + with fail_after(3): + result = await scheduler.get_job_result(job_id) + assert result.job_id == job_id assert result.outcome is JobOutcome.error assert isinstance(result.exception, RuntimeError) @@ -221,6 +235,7 @@ class TestAsyncScheduler: async with AsyncScheduler() as scheduler: scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True) job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + await scheduler.start_in_background() with fail_after(3): await event.wait() @@ -231,15 +246,18 @@ class TestAsyncScheduler: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2}) with pytest.raises(JobLookupError): - await scheduler.get_job_result(job_id, wait=False) + with fail_after(3): + await scheduler.get_job_result(job_id, wait=False) async def test_run_job_success(self) -> None: async with AsyncScheduler() as scheduler: + await scheduler.start_in_background() return_value = await scheduler.run_job(dummy_async_job) assert return_value == "returnvalue" async def test_run_job_failure(self) -> None: async with AsyncScheduler() as scheduler: + await scheduler.start_in_background() with pytest.raises(RuntimeError, match="failing as requested"): await scheduler.run_job(dummy_async_job, kwargs={"fail": True}) @@ -271,7 +289,10 @@ class TestAsyncScheduler: result_expiration_time=timedelta(seconds=10), ) await scheduler.data_store.add_job(job) - result = await scheduler.get_job_result(job.id) + await scheduler.start_in_background() + with fail_after(3): + result = await scheduler.get_job_result(job.id) + if result.outcome is JobOutcome.error: raise result.exception else: @@ -302,6 +323,7 @@ class TestSyncScheduler: with Scheduler(start_worker=False) as scheduler: scheduler.event_broker.subscribe(listener) scheduler.add_schedule(dummy_sync_job, trigger, id="foo") + scheduler.start_in_background() event.wait(3) # First, a task was added @@ -314,6 +336,10 @@ class TestSyncScheduler: assert isinstance(received_event, ScheduleAdded) assert received_event.schedule_id == "foo" + # Then the scheduler was started + received_event = received_events.pop(0) + assert isinstance(received_event, SchedulerStarted) + # Then that schedule was processed and a job was added for it received_event = received_events.pop(0) assert isinstance(received_event, JobAdded) @@ -389,6 +415,7 @@ class TestSyncScheduler: assert schedule.max_jitter == timedelta(seconds=max_jitter) # Wait for the job to be added + scheduler.start_in_background() job_added_event.wait(3) fake_uniform.assert_called_once_with(0, expected_upper_bound) @@ -405,6 +432,7 @@ class TestSyncScheduler: def test_get_job_result_success(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5) + scheduler.start_in_background() result = scheduler.get_job_result(job_id) assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" @@ -416,6 +444,7 @@ class TestSyncScheduler: lambda evt: event.set(), {JobReleased}, one_shot=True ): job_id = scheduler.add_job(dummy_sync_job) + scheduler.start_in_background() event.wait(3) with pytest.raises(JobLookupError): @@ -426,6 +455,7 @@ class TestSyncScheduler: job_id = scheduler.add_job( dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5 ) + scheduler.start_in_background() result = scheduler.get_job_result(job_id) assert result.job_id == job_id assert result.outcome is JobOutcome.error @@ -438,6 +468,7 @@ class TestSyncScheduler: lambda evt: event.set(), one_shot=True ): job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + scheduler.start_in_background() event.wait(3) with pytest.raises(JobLookupError): scheduler.get_job_result(job_id, wait=False) @@ -445,16 +476,19 @@ class TestSyncScheduler: def test_get_job_result_nowait_not_yet_ready(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2}) + scheduler.start_in_background() with pytest.raises(JobLookupError): scheduler.get_job_result(job_id, wait=False) def test_run_job_success(self) -> None: with Scheduler() as scheduler: + scheduler.start_in_background() return_value = scheduler.run_job(dummy_sync_job) assert return_value == "returnvalue" def test_run_job_failure(self) -> None: with Scheduler() as scheduler: + scheduler.start_in_background() with pytest.raises(RuntimeError, match="failing as requested"): scheduler.run_job(dummy_sync_job, kwargs={"fail": True}) @@ -484,6 +518,7 @@ class TestSyncScheduler: result_expiration_time=timedelta(seconds=10), ) scheduler.data_store.add_job(job) + scheduler.start_in_background() result = scheduler.get_job_result(job.id) if result.outcome is JobOutcome.error: raise result.exception @@ -496,6 +531,7 @@ class TestSyncScheduler: datetime.now(timezone.utc) + timedelta(milliseconds=100) ) scheduler.add_schedule(scheduler.stop, trigger) + scheduler.start_in_background() scheduler.wait_until_stopped() # This should be a no-op |