summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-04 01:29:43 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-04 01:29:43 +0300
commit5f0627be932d561c92bedc684ab1b8c5646520d4 (patch)
tree078ae657f60dd1bbcdbbdf300d1c7942e958e64b
parent90a9675f4444809c72c25a450a40243be8110b68 (diff)
downloadapscheduler-5f0627be932d561c92bedc684ab1b8c5646520d4.tar.gz
Changed the scheduler API to require an explicit start call
-rw-r--r--docs/versionhistory.rst3
-rw-r--r--examples/separate_worker/async_scheduler.py2
-rw-r--r--examples/separate_worker/sync_scheduler.py2
-rw-r--r--examples/standalone/async_memory.py2
-rw-r--r--examples/standalone/async_mysql.py2
-rw-r--r--examples/standalone/async_postgres.py2
-rw-r--r--examples/standalone/sync_memory.py2
-rw-r--r--examples/web/asgi_fastapi.py1
-rw-r--r--examples/web/asgi_noframework.py1
-rw-r--r--examples/web/asgi_starlette.py1
-rw-r--r--pyproject.toml1
-rw-r--r--src/apscheduler/schedulers/async_.py73
-rw-r--r--src/apscheduler/schedulers/sync.py35
-rw-r--r--tests/test_schedulers.py44
14 files changed, 131 insertions, 40 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index e847918..103daf6 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -6,6 +6,9 @@ APScheduler, see the :doc:`migration section <migration>`.
**UNRELEASED**
+- **BREAKING** Changed the scheduler API to always require a call to either
+ ``run_until_stopped()`` or ``start_in_background()`` to start the scheduler (using it
+ as a context manager is no longer enough)
- Added an async Redis event broker
- Added automatic reconnection to the Redis event brokers (sync and async)
- Added automatic reconnection to the asyncpg event broker
diff --git a/examples/separate_worker/async_scheduler.py b/examples/separate_worker/async_scheduler.py
index 27eb37a..6ffdbcd 100644
--- a/examples/separate_worker/async_scheduler.py
+++ b/examples/separate_worker/async_scheduler.py
@@ -40,7 +40,7 @@ async def main():
data_store, event_broker, start_worker=False
) as scheduler:
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
- await scheduler.wait_until_stopped()
+ await scheduler.run_until_stopped()
logging.basicConfig(level=logging.INFO)
diff --git a/examples/separate_worker/sync_scheduler.py b/examples/separate_worker/sync_scheduler.py
index 9c40032..83d2d06 100644
--- a/examples/separate_worker/sync_scheduler.py
+++ b/examples/separate_worker/sync_scheduler.py
@@ -34,4 +34,4 @@ event_broker = RedisEventBroker.from_url("redis://localhost")
with Scheduler(data_store, event_broker, start_worker=False) as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
- scheduler.wait_until_stopped()
+ scheduler.run_until_stopped()
diff --git a/examples/standalone/async_memory.py b/examples/standalone/async_memory.py
index b3db775..8fd4e2d 100644
--- a/examples/standalone/async_memory.py
+++ b/examples/standalone/async_memory.py
@@ -22,7 +22,7 @@ def tick():
async def main():
async with AsyncScheduler() as scheduler:
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
- await scheduler.wait_until_stopped()
+ await scheduler.run_until_stopped()
run(main())
diff --git a/examples/standalone/async_mysql.py b/examples/standalone/async_mysql.py
index 3f6f95f..6e8d801 100644
--- a/examples/standalone/async_mysql.py
+++ b/examples/standalone/async_mysql.py
@@ -32,7 +32,7 @@ async def main():
data_store = AsyncSQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
- await scheduler.wait_until_stopped()
+ await scheduler.run_until_stopped()
run(main())
diff --git a/examples/standalone/async_postgres.py b/examples/standalone/async_postgres.py
index e6083f7..e9c519a 100644
--- a/examples/standalone/async_postgres.py
+++ b/examples/standalone/async_postgres.py
@@ -32,7 +32,7 @@ async def main():
data_store = AsyncSQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
- await scheduler.wait_until_stopped()
+ await scheduler.run_until_stopped()
run(main())
diff --git a/examples/standalone/sync_memory.py b/examples/standalone/sync_memory.py
index 6d804f7..13b22a7 100644
--- a/examples/standalone/sync_memory.py
+++ b/examples/standalone/sync_memory.py
@@ -20,4 +20,4 @@ def tick():
with Scheduler() as scheduler:
scheduler.add_schedule(tick, IntervalTrigger(seconds=1))
- scheduler.wait_until_stopped()
+ scheduler.run_until_stopped()
diff --git a/examples/web/asgi_fastapi.py b/examples/web/asgi_fastapi.py
index 2cfa5c4..ffa79cb 100644
--- a/examples/web/asgi_fastapi.py
+++ b/examples/web/asgi_fastapi.py
@@ -45,6 +45,7 @@ class SchedulerMiddleware:
await self.scheduler.add_schedule(
tick, IntervalTrigger(seconds=1), id="tick"
)
+ await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
diff --git a/examples/web/asgi_noframework.py b/examples/web/asgi_noframework.py
index c3ac184..4b57c30 100644
--- a/examples/web/asgi_noframework.py
+++ b/examples/web/asgi_noframework.py
@@ -64,6 +64,7 @@ async def scheduler_middleware(scope, receive, send):
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
async with AsyncScheduler(data_store, event_broker) as scheduler:
await scheduler.add_schedule(tick, IntervalTrigger(seconds=1), id="tick")
+ await scheduler.start_in_background()
await original_app(scope, receive, send)
else:
await original_app(scope, receive, send)
diff --git a/examples/web/asgi_starlette.py b/examples/web/asgi_starlette.py
index edc070c..4288bfb 100644
--- a/examples/web/asgi_starlette.py
+++ b/examples/web/asgi_starlette.py
@@ -46,6 +46,7 @@ class SchedulerMiddleware:
await self.scheduler.add_schedule(
tick, IntervalTrigger(seconds=1), id="tick"
)
+ await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
diff --git a/pyproject.toml b/pyproject.toml
index e9eb5fb..b400d39 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -31,6 +31,7 @@ dependencies = [
"attrs >= 21.3",
"tenacity ~= 8.0",
"tzlocal >= 3.0",
+ "typing_extensions; python_version < '3.11'"
]
dynamic = ["version"]
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 2851f6e..75972b5 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -3,10 +3,12 @@ from __future__ import annotations
import os
import platform
import random
+import sys
from asyncio import CancelledError
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
+from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, cast
from uuid import UUID, uuid4
@@ -39,6 +41,11 @@ from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..marshalling import callable_to_ref
from ..workers.async_ import AsyncWorker
+if sys.version_info >= (3, 11):
+ from typing import Self
+else:
+ from typing_extensions import Self
+
_microsecond_delta = timedelta(microseconds=1)
_zero_timedelta = timedelta()
@@ -59,6 +66,8 @@ class AsyncScheduler:
_state: RunState = attrs.field(init=False, default=RunState.stopped)
_task_group: TaskGroup | None = attrs.field(init=False, default=None)
+ _exit_stack: AsyncExitStack | None = attrs.field(init=False, default=None)
+ _services_initialized: bool = attrs.field(init=False, default=False)
_wakeup_event: anyio.Event = attrs.field(init=False)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
_schedule_added_subscription: Subscription = attrs.field(init=False)
@@ -67,17 +76,43 @@ class AsyncScheduler:
if not self.identity:
self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
- async def __aenter__(self):
- self._task_group = create_task_group()
- await self._task_group.__aenter__()
- await self._task_group.start(self._run)
+ async def __aenter__(self: Self) -> Self:
+ self._exit_stack = AsyncExitStack()
+ await self._exit_stack.__aenter__()
+ await self._ensure_services_ready(self._exit_stack)
+ self._task_group = await self._exit_stack.enter_async_context(
+ create_task_group()
+ )
return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException],
+ exc_val: BaseException,
+ exc_tb: TracebackType,
+ ) -> None:
await self.stop()
- await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
+ await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
self._task_group = None
+ async def _ensure_services_ready(self, exit_stack: AsyncExitStack) -> None:
+ """Ensure that the data store and event broker have been initialized."""
+ if not self._services_initialized:
+ self._services_initialized = True
+ exit_stack.callback(setattr, self, "_services_initialized", False)
+
+ # Initialize the event broker
+ await self.event_broker.start()
+ exit_stack.push_async_exit(
+ lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None)
+ )
+
+ # Initialize the data store
+ await self.data_store.start(self.event_broker)
+ exit_stack.push_async_exit(
+ lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None)
+ )
+
def _schedule_added_or_modified(self, event: Event) -> None:
event_ = cast("ScheduleAdded | ScheduleUpdated", event)
if not self._wakeup_deadline or (
@@ -342,7 +377,18 @@ class AsyncScheduler:
):
await event.wait()
- async def _run(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
+ async def start_in_background(self) -> None:
+ if self._task_group is None:
+ raise RuntimeError(
+ "The scheduler must be used as an async context manager (async with "
+ "...) in order to be startable in the background"
+ )
+
+ await self._task_group.start(self.run_until_stopped)
+
+ async def run_until_stopped(
+ self, *, task_status: TaskStatus = TASK_STATUS_IGNORED
+ ) -> None:
if self._state is not RunState.stopped:
raise RuntimeError(
f'Cannot start the scheduler when it is in the "{self._state}" '
@@ -352,18 +398,7 @@ class AsyncScheduler:
self._state = RunState.starting
async with AsyncExitStack() as exit_stack:
self._wakeup_event = anyio.Event()
-
- # Initialize the event broker
- await self.event_broker.start()
- exit_stack.push_async_exit(
- lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None)
- )
-
- # Initialize the data store
- await self.data_store.start(self.event_broker)
- exit_stack.push_async_exit(
- lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None)
- )
+ await self._ensure_services_ready(exit_stack)
# Wake up the scheduler if the data store emits a significant schedule event
exit_stack.enter_context(
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index c821a6a..d98161a 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -4,6 +4,7 @@ import atexit
import os
import platform
import random
+import sys
import threading
from concurrent.futures import Future
from contextlib import ExitStack
@@ -38,6 +39,11 @@ from ..eventbrokers.local import LocalEventBroker
from ..marshalling import callable_to_ref
from ..workers.sync import Worker
+if sys.version_info >= (3, 11):
+ from typing import Self
+else:
+ from typing_extensions import Self
+
_microsecond_delta = timedelta(microseconds=1)
_zero_timedelta = timedelta()
@@ -57,15 +63,16 @@ class Scheduler:
_wakeup_event: threading.Event = attrs.field(init=False, factory=threading.Event)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
_services_initialized: bool = attrs.field(init=False, default=False)
- _exit_stack: ExitStack = attrs.field(init=False, factory=ExitStack)
+ _exit_stack: ExitStack | None = attrs.field(init=False, default=None)
_lock: threading.RLock = attrs.field(init=False, factory=threading.RLock)
def __attrs_post_init__(self) -> None:
if not self.identity:
self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
- def __enter__(self) -> Scheduler:
- self.start_in_background()
+ def __enter__(self: Self) -> Self:
+ self._exit_stack = ExitStack()
+ self._ensure_services_ready(self._exit_stack)
return self
def __exit__(
@@ -75,16 +82,24 @@ class Scheduler:
exc_tb: TracebackType,
) -> None:
self.stop()
- self._join_thread()
+ self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
def _ensure_services_ready(self, exit_stack: ExitStack | None = None) -> None:
"""Ensure that the data store and event broker have been initialized."""
- stack = exit_stack or self._exit_stack
with self._lock:
if not self._services_initialized:
+ if exit_stack is None:
+ if self._exit_stack is None:
+ self._exit_stack = exit_stack = ExitStack()
+ atexit.register(self._exit_stack.close)
+ else:
+ exit_stack = self._exit_stack
+
self._services_initialized = True
+ exit_stack.callback(setattr, self, "_services_initialized", False)
+
self.event_broker.start()
- stack.push(
+ exit_stack.push(
lambda *exc_info: self.event_broker.stop(
force=exc_info[0] is not None
)
@@ -92,13 +107,11 @@ class Scheduler:
# Initialize the data store
self.data_store.start(self.event_broker)
- stack.push(
+ exit_stack.push(
lambda *exc_info: self.data_store.stop(
force=exc_info[0] is not None
)
)
- if not exit_stack:
- atexit.register(self._exit_stack.close)
def _schedule_added_or_modified(self, event: Event) -> None:
event_ = cast("ScheduleAdded | ScheduleUpdated", event)
@@ -377,8 +390,8 @@ class Scheduler:
self._thread = None
raise
- atexit.register(self._join_thread)
- atexit.register(self.stop)
+ self._exit_stack.callback(self._join_thread)
+ self._exit_stack.callback(self.stop)
def stop(self) -> None:
"""
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index dea5e83..32c1c29 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -22,6 +22,7 @@ from apscheduler import (
ScheduleAdded,
ScheduleLookupError,
ScheduleRemoved,
+ SchedulerStarted,
SchedulerStopped,
Task,
TaskAdded,
@@ -72,6 +73,7 @@ class TestAsyncScheduler:
async with AsyncScheduler(start_worker=False) as scheduler:
scheduler.event_broker.subscribe(listener)
await scheduler.add_schedule(dummy_async_job, trigger, id="foo")
+ await scheduler.start_in_background()
with fail_after(3):
await event.wait()
@@ -86,6 +88,10 @@ class TestAsyncScheduler:
assert received_event.schedule_id == "foo"
# assert received_event.task_id == 'task_id'
+ # Then the scheduler was started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStarted)
+
# Then that schedule was processed and a job was added for it
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
@@ -166,6 +172,7 @@ class TestAsyncScheduler:
assert schedule.max_jitter == timedelta(seconds=max_jitter)
# Wait for the job to be added
+ await scheduler.start_in_background()
with fail_after(3):
await job_added_event.wait()
@@ -185,7 +192,10 @@ class TestAsyncScheduler:
job_id = await scheduler.add_job(
dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5
)
- result = await scheduler.get_job_result(job_id)
+ await scheduler.start_in_background()
+ with fail_after(3):
+ result = await scheduler.get_job_result(job_id)
+
assert result.job_id == job_id
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
@@ -197,6 +207,7 @@ class TestAsyncScheduler:
lambda evt: event.set(), {JobReleased}, one_shot=True
)
job_id = await scheduler.add_job(dummy_async_job)
+ await scheduler.start_in_background()
with fail_after(3):
await event.wait()
@@ -210,7 +221,10 @@ class TestAsyncScheduler:
kwargs={"delay": 0.2, "fail": True},
result_expiration_time=5,
)
- result = await scheduler.get_job_result(job_id)
+ await scheduler.start_in_background()
+ with fail_after(3):
+ result = await scheduler.get_job_result(job_id)
+
assert result.job_id == job_id
assert result.outcome is JobOutcome.error
assert isinstance(result.exception, RuntimeError)
@@ -221,6 +235,7 @@ class TestAsyncScheduler:
async with AsyncScheduler() as scheduler:
scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True)
job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ await scheduler.start_in_background()
with fail_after(3):
await event.wait()
@@ -231,15 +246,18 @@ class TestAsyncScheduler:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
with pytest.raises(JobLookupError):
- await scheduler.get_job_result(job_id, wait=False)
+ with fail_after(3):
+ await scheduler.get_job_result(job_id, wait=False)
async def test_run_job_success(self) -> None:
async with AsyncScheduler() as scheduler:
+ await scheduler.start_in_background()
return_value = await scheduler.run_job(dummy_async_job)
assert return_value == "returnvalue"
async def test_run_job_failure(self) -> None:
async with AsyncScheduler() as scheduler:
+ await scheduler.start_in_background()
with pytest.raises(RuntimeError, match="failing as requested"):
await scheduler.run_job(dummy_async_job, kwargs={"fail": True})
@@ -271,7 +289,10 @@ class TestAsyncScheduler:
result_expiration_time=timedelta(seconds=10),
)
await scheduler.data_store.add_job(job)
- result = await scheduler.get_job_result(job.id)
+ await scheduler.start_in_background()
+ with fail_after(3):
+ result = await scheduler.get_job_result(job.id)
+
if result.outcome is JobOutcome.error:
raise result.exception
else:
@@ -302,6 +323,7 @@ class TestSyncScheduler:
with Scheduler(start_worker=False) as scheduler:
scheduler.event_broker.subscribe(listener)
scheduler.add_schedule(dummy_sync_job, trigger, id="foo")
+ scheduler.start_in_background()
event.wait(3)
# First, a task was added
@@ -314,6 +336,10 @@ class TestSyncScheduler:
assert isinstance(received_event, ScheduleAdded)
assert received_event.schedule_id == "foo"
+ # Then the scheduler was started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStarted)
+
# Then that schedule was processed and a job was added for it
received_event = received_events.pop(0)
assert isinstance(received_event, JobAdded)
@@ -389,6 +415,7 @@ class TestSyncScheduler:
assert schedule.max_jitter == timedelta(seconds=max_jitter)
# Wait for the job to be added
+ scheduler.start_in_background()
job_added_event.wait(3)
fake_uniform.assert_called_once_with(0, expected_upper_bound)
@@ -405,6 +432,7 @@ class TestSyncScheduler:
def test_get_job_result_success(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5)
+ scheduler.start_in_background()
result = scheduler.get_job_result(job_id)
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
@@ -416,6 +444,7 @@ class TestSyncScheduler:
lambda evt: event.set(), {JobReleased}, one_shot=True
):
job_id = scheduler.add_job(dummy_sync_job)
+ scheduler.start_in_background()
event.wait(3)
with pytest.raises(JobLookupError):
@@ -426,6 +455,7 @@ class TestSyncScheduler:
job_id = scheduler.add_job(
dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5
)
+ scheduler.start_in_background()
result = scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.error
@@ -438,6 +468,7 @@ class TestSyncScheduler:
lambda evt: event.set(), one_shot=True
):
job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ scheduler.start_in_background()
event.wait(3)
with pytest.raises(JobLookupError):
scheduler.get_job_result(job_id, wait=False)
@@ -445,16 +476,19 @@ class TestSyncScheduler:
def test_get_job_result_nowait_not_yet_ready(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2})
+ scheduler.start_in_background()
with pytest.raises(JobLookupError):
scheduler.get_job_result(job_id, wait=False)
def test_run_job_success(self) -> None:
with Scheduler() as scheduler:
+ scheduler.start_in_background()
return_value = scheduler.run_job(dummy_sync_job)
assert return_value == "returnvalue"
def test_run_job_failure(self) -> None:
with Scheduler() as scheduler:
+ scheduler.start_in_background()
with pytest.raises(RuntimeError, match="failing as requested"):
scheduler.run_job(dummy_sync_job, kwargs={"fail": True})
@@ -484,6 +518,7 @@ class TestSyncScheduler:
result_expiration_time=timedelta(seconds=10),
)
scheduler.data_store.add_job(job)
+ scheduler.start_in_background()
result = scheduler.get_job_result(job.id)
if result.outcome is JobOutcome.error:
raise result.exception
@@ -496,6 +531,7 @@ class TestSyncScheduler:
datetime.now(timezone.utc) + timedelta(milliseconds=100)
)
scheduler.add_schedule(scheduler.stop, trigger)
+ scheduler.start_in_background()
scheduler.wait_until_stopped()
# This should be a no-op