diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-21 01:56:32 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-09-21 02:40:02 +0300 |
commit | e8055bce55bb004168b5787f89091057cf1f36c7 (patch) | |
tree | c8c377a4a8301e643efc939cea86138b8d72e95d | |
parent | c5727432736b55b7d76753307f14efdb962c2edf (diff) | |
download | apscheduler-e8055bce55bb004168b5787f89091057cf1f36c7.tar.gz |
Merged the Worker class into AsyncScheduler
-rw-r--r-- | docs/versionhistory.rst | 8 | ||||
-rw-r--r-- | src/apscheduler/__init__.py | 9 | ||||
-rw-r--r-- | src/apscheduler/_enums.py | 16 | ||||
-rw-r--r-- | src/apscheduler/_events.py | 30 | ||||
-rw-r--r-- | src/apscheduler/_validators.py | 6 | ||||
-rw-r--r-- | src/apscheduler/_worker.py | 189 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 3 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 507 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 16 | ||||
-rw-r--r-- | tests/test_schedulers.py | 22 |
10 files changed, 372 insertions, 434 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 559ee36..e86d486 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -6,9 +6,11 @@ APScheduler, see the :doc:`migration section <migration>`. **UNRELEASED** -- **BREAKING** Workers can no longer be run independently. Instead, you can run a - scheduler that only starts a worker but does not process schedules by passing - ``process_schedules=False`` to the scheduler +- **BREAKING** Workers were merged into schedulers. As the ``Worker`` and + ``AsyncWorker`` classes have been removed, you now need to pass + ``role=SchedulerRole.scheduler`` to the scheduler to prevent it from processing due + jobs. The worker event classes (``WorkerEvent``, ``WorkerStarted``, ``WorkerStopped``) + have also been removed. - **BREAKING** The synchronous interfaces for event brokers and data stores have been removed. Synchronous libraries can still be used to implement these services through the use of ``anyio.to_thread.run_sync()``. diff --git a/src/apscheduler/__init__.py b/src/apscheduler/__init__.py index 844a056..ddf34d9 100644 --- a/src/apscheduler/__init__.py +++ b/src/apscheduler/__init__.py @@ -31,6 +31,7 @@ __all__ = [ "ScheduleRemoved", "ScheduleDeserializationFailed", "SchedulerEvent", + "SchedulerRole", "SchedulerStarted", "SchedulerStopped", "Task", @@ -38,9 +39,6 @@ __all__ = [ "TaskLookupError", "TaskUpdated", "TaskRemoved", - "WorkerEvent", - "WorkerStarted", - "WorkerStopped", "current_async_scheduler", "current_scheduler", "current_job", @@ -49,7 +47,7 @@ __all__ = [ from typing import Any from ._context import current_async_scheduler, current_job, current_scheduler -from ._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState +from ._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole from ._events import ( DataStoreEvent, Event, @@ -68,9 +66,6 @@ from ._events import ( TaskAdded, TaskRemoved, TaskUpdated, - WorkerEvent, - WorkerStarted, - WorkerStopped, ) from ._exceptions import ( ConflictingIdError, diff --git a/src/apscheduler/_enums.py b/src/apscheduler/_enums.py index d8e706d..0125f2a 100644 --- a/src/apscheduler/_enums.py +++ b/src/apscheduler/_enums.py @@ -3,6 +3,22 @@ from __future__ import annotations from enum import Enum, auto +class SchedulerRole(Enum): + """ + Specifies what the scheduler should be doing when it's running. + + Values: + + * ``scheduler``: processes due schedules, but won't run jobs + * ``worker``: runs due jobs, but won't process schedules + * ``both``: processes schedules and runs due jobs + """ + + scheduler = auto() + worker = auto() + both = auto() + + class RunState(Enum): """ Used to track the running state of schedulers and workers. diff --git a/src/apscheduler/_events.py b/src/apscheduler/_events.py index a3bd297..9d0278d 100644 --- a/src/apscheduler/_events.py +++ b/src/apscheduler/_events.py @@ -206,34 +206,8 @@ class SchedulerStopped(SchedulerEvent): exception: BaseException | None = None -# -# Worker events -# - - -@attrs.define(kw_only=True, frozen=True) -class WorkerEvent(Event): - """Base class for events originating from a worker.""" - - -@attrs.define(kw_only=True, frozen=True) -class WorkerStarted(WorkerEvent): - """Signals that a worker has started.""" - - -@attrs.define(kw_only=True, frozen=True) -class WorkerStopped(WorkerEvent): - """ - Signals that a worker has stopped. - - :ivar exception: the exception that caused the worker to stop, if any - """ - - exception: BaseException | None = None - - @attrs.define(kw_only=True, frozen=True) -class JobAcquired(WorkerEvent): +class JobAcquired(SchedulerEvent): """ Signals that a worker has acquired a job for processing. @@ -246,7 +220,7 @@ class JobAcquired(WorkerEvent): @attrs.define(kw_only=True, frozen=True) -class JobReleased(WorkerEvent): +class JobReleased(SchedulerEvent): """ Signals that a worker has finished processing of a job. diff --git a/src/apscheduler/_validators.py b/src/apscheduler/_validators.py index 4197edf..eebb3c0 100644 --- a/src/apscheduler/_validators.py +++ b/src/apscheduler/_validators.py @@ -4,7 +4,6 @@ import sys from datetime import date, datetime, timedelta, timezone, tzinfo from typing import Any -import attrs from attrs import Attribute from tzlocal import get_localzone @@ -179,8 +178,3 @@ def require_state_version( raise DeserializationError( 'Missing "version" key in the serialized state' ) from exc - - -def positive_integer(inst, field: attrs.Attribute, value) -> None: - if value <= 0: - raise ValueError(f"{field} must be a positive integer") diff --git a/src/apscheduler/_worker.py b/src/apscheduler/_worker.py deleted file mode 100644 index 8c95d16..0000000 --- a/src/apscheduler/_worker.py +++ /dev/null @@ -1,189 +0,0 @@ -from __future__ import annotations - -from collections.abc import Mapping -from contextlib import AsyncExitStack -from datetime import datetime, timezone -from logging import Logger, getLogger -from typing import Callable -from uuid import UUID - -import anyio -import attrs -from anyio import create_task_group, get_cancelled_exc_class, move_on_after -from anyio.abc import CancelScope - -from ._context import current_job -from ._enums import JobOutcome, RunState -from ._events import JobAdded, JobReleased, WorkerStarted, WorkerStopped -from ._structures import Job, JobInfo, JobResult -from ._validators import positive_integer -from .abc import DataStore, EventBroker, JobExecutor - - -@attrs.define(eq=False, kw_only=True) -class Worker: - """ - Runs jobs locally in a task group. - - :param max_concurrent_jobs: Maximum number of jobs the worker will run at once - """ - - job_executors: Mapping[str, JobExecutor] = attrs.field(kw_only=True) - max_concurrent_jobs: int = attrs.field( - kw_only=True, validator=positive_integer, default=100 - ) - logger: Logger = attrs.field(kw_only=True, default=getLogger(__name__)) - - _data_store: DataStore = attrs.field(init=False) - _event_broker: EventBroker = attrs.field(init=False) - _identity: str = attrs.field(init=False) - _state: RunState = attrs.field(init=False, default=RunState.stopped) - _wakeup_event: anyio.Event = attrs.field(init=False) - _acquired_jobs: set[Job] = attrs.field(init=False, factory=set) - _running_jobs: set[UUID] = attrs.field(init=False, factory=set) - - async def start( - self, - exit_stack: AsyncExitStack, - data_store: DataStore, - event_broker: EventBroker, - identity: str, - ) -> None: - self._data_store = data_store - self._event_broker = event_broker - self._identity = identity - self._state = RunState.started - self._wakeup_event = anyio.Event() - - # Start the job executors - for job_executor in self.job_executors.values(): - await job_executor.start(exit_stack) - - # Start the worker in a background task - task_group = await exit_stack.enter_async_context(create_task_group()) - task_group.start_soon(self._run) - - # Stop the worker when the exit stack unwinds - exit_stack.callback(lambda: self._wakeup_event.set()) - exit_stack.callback(setattr, self, "_state", RunState.stopped) - - # Wake up the worker if the data store emits a significant job event - exit_stack.enter_context( - self._event_broker.subscribe( - lambda event: self._wakeup_event.set(), {JobAdded} - ) - ) - - # Signal that the worker has started - await self._event_broker.publish_local(WorkerStarted()) - - async def _run(self) -> None: - """Run the worker until it is explicitly stopped.""" - exception: BaseException | None = None - try: - async with create_task_group() as tg: - while self._state is RunState.started: - limit = self.max_concurrent_jobs - len(self._running_jobs) - jobs = await self._data_store.acquire_jobs(self._identity, limit) - for job in jobs: - task = await self._data_store.get_task(job.task_id) - self._running_jobs.add(job.id) - tg.start_soon(self._run_job, job, task.func, task.executor) - - await self._wakeup_event.wait() - self._wakeup_event = anyio.Event() - except get_cancelled_exc_class(): - pass - except BaseException as exc: - exception = exc - raise - finally: - if not exception: - self.logger.info("Worker stopped") - elif isinstance(exception, Exception): - self.logger.exception("Worker crashed") - elif exception: - self.logger.info( - f"Worker stopped due to {exception.__class__.__name__}" - ) - - with move_on_after(3, shield=True): - await self._event_broker.publish_local( - WorkerStopped(exception=exception) - ) - - async def _run_job(self, job: Job, func: Callable, executor: str) -> None: - try: - # Check if the job started before the deadline - start_time = datetime.now(timezone.utc) - if job.start_deadline is not None and start_time > job.start_deadline: - result = JobResult.from_job( - job, - outcome=JobOutcome.missed_start_deadline, - finished_at=start_time, - ) - await self._data_store.release_job(self._identity, job.task_id, result) - await self._event_broker.publish( - JobReleased.from_result(result, self._identity) - ) - return - - try: - job_executor = self.job_executors[executor] - except KeyError: - return - - token = current_job.set(JobInfo.from_job(job)) - try: - retval = await job_executor.run_job(func, job) - except get_cancelled_exc_class(): - self.logger.info("Job %s was cancelled", job.id) - with CancelScope(shield=True): - result = JobResult.from_job( - job, - outcome=JobOutcome.cancelled, - ) - await self._data_store.release_job( - self._identity, job.task_id, result - ) - await self._event_broker.publish( - JobReleased.from_result(result, self._identity) - ) - except BaseException as exc: - if isinstance(exc, Exception): - self.logger.exception("Job %s raised an exception", job.id) - else: - self.logger.error( - "Job %s was aborted due to %s", job.id, exc.__class__.__name__ - ) - - result = JobResult.from_job( - job, - JobOutcome.error, - exception=exc, - ) - await self._data_store.release_job( - self._identity, - job.task_id, - result, - ) - await self._event_broker.publish( - JobReleased.from_result(result, self._identity) - ) - if not isinstance(exc, Exception): - raise - else: - self.logger.info("Job %s completed successfully", job.id) - result = JobResult.from_job( - job, - JobOutcome.success, - return_value=retval, - ) - await self._data_store.release_job(self._identity, job.task_id, result) - await self._event_broker.publish( - JobReleased.from_result(result, self._identity) - ) - finally: - current_job.reset(token) - finally: - self._running_jobs.remove(job.id) diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 00d06aa..2322205 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -33,6 +33,7 @@ from sqlalchemy.engine import URL, Dialect, Result from sqlalchemy.exc import CompileError, IntegrityError, InterfaceError from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine from sqlalchemy.future import Connection, Engine +from sqlalchemy.sql import ClauseElement, Executable from sqlalchemy.sql.ddl import DropTable from sqlalchemy.sql.elements import BindParameter, literal @@ -197,7 +198,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore): async def _execute( self, conn: Connection | AsyncConnection, - statement, + statement: ClauseElement | Executable, parameters: Sequence | None = None, ): if isinstance(conn, AsyncConnection): diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 44fee27..11a1c67 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -4,10 +4,10 @@ import os import platform import random import sys -from asyncio import CancelledError from collections.abc import MutableMapping from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone +from inspect import isclass from logging import Logger, getLogger from types import TracebackType from typing import Any, Callable, Iterable, Mapping, cast @@ -15,12 +15,19 @@ from uuid import UUID, uuid4 import anyio import attrs -from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after +from anyio import ( + TASK_STATUS_IGNORED, + CancelScope, + create_task_group, + get_cancelled_exc_class, + move_on_after, +) from anyio.abc import TaskGroup, TaskStatus from attr.validators import instance_of -from .._context import current_async_scheduler -from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState +from .. import JobAdded +from .._context import current_async_scheduler, current_job +from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole from .._events import ( Event, JobReleased, @@ -35,8 +42,8 @@ from .._exceptions import ( JobLookupError, ScheduleLookupError, ) -from .._structures import Job, JobResult, Schedule, Task -from .._worker import Worker +from .._structures import Job, JobInfo, JobResult, Schedule, Task +from .._validators import non_negative_number from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger from ..datastores.memory import MemoryDataStore from ..eventbrokers.local import LocalEventBroker @@ -54,32 +61,37 @@ _microsecond_delta = timedelta(microseconds=1) _zero_timedelta = timedelta() -@attrs.define(eq=False) +@attrs.define(eq=False, kw_only=True) class AsyncScheduler: - """An asynchronous (AnyIO based) scheduler implementation.""" + """ + An asynchronous (AnyIO based) scheduler implementation. + + :param data_store: the data store for tasks, schedules and jobs + :param event_broker: the event broker to use for publishing an subscribing events + :param max_concurrent_jobs: Maximum number of jobs the worker will run at once + :param role: specifies what the scheduler should be doing when running + :param process_schedules: ``True`` to process due schedules in this scheduler + """ data_store: DataStore = attrs.field( - validator=instance_of(DataStore), factory=MemoryDataStore + kw_only=False, validator=instance_of(DataStore), factory=MemoryDataStore ) event_broker: EventBroker = attrs.field( - validator=instance_of(EventBroker), factory=LocalEventBroker - ) - identity: str = attrs.field(kw_only=True, default=None) - process_jobs: bool = attrs.field(kw_only=True, default=True) - job_executors: MutableMapping[str, JobExecutor] | None = attrs.field( - kw_only=True, default=None + kw_only=False, validator=instance_of(EventBroker), factory=LocalEventBroker ) - default_job_executor: str | None = attrs.field(kw_only=True, default=None) - process_schedules: bool = attrs.field(kw_only=True, default=True) - logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__)) + identity: str = attrs.field(default=None) + role: SchedulerRole = attrs.field(default=SchedulerRole.both) + max_concurrent_jobs: int = attrs.field(validator=non_negative_number, default=100) + job_executors: MutableMapping[str, JobExecutor] | None = attrs.field(default=None) + default_job_executor: str | None = attrs.field(default=None) + logger: Logger | None = attrs.field(default=getLogger(__name__)) _state: RunState = attrs.field(init=False, default=RunState.stopped) - _task_group: TaskGroup | None = attrs.field(init=False, default=None) + _services_task_group: TaskGroup | None = attrs.field(init=False, default=None) _exit_stack: AsyncExitStack = attrs.field(init=False, factory=AsyncExitStack) _services_initialized: bool = attrs.field(init=False, default=False) - _wakeup_event: anyio.Event = attrs.field(init=False) - _wakeup_deadline: datetime | None = attrs.field(init=False, default=None) - _schedule_added_subscription: Subscription = attrs.field(init=False) + _scheduler_cancel_scope: CancelScope | None = attrs.field(init=False, default=None) + _running_jobs: set[Job] = attrs.field(init=False, factory=set) def __attrs_post_init__(self) -> None: if not self.identity: @@ -103,10 +115,10 @@ class AsyncScheduler: await self._exit_stack.__aenter__() try: await self._ensure_services_initialized(self._exit_stack) - self._task_group = await self._exit_stack.enter_async_context( + self._services_task_group = await self._exit_stack.enter_async_context( create_task_group() ) - self._exit_stack.callback(setattr, self, "_task_group", None) + self._exit_stack.callback(setattr, self, "_services_task_group", None) except BaseException as exc: await self._exit_stack.__aexit__(type(exc), exc, exc.__traceback__) raise @@ -123,7 +135,10 @@ class AsyncScheduler: await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) async def _ensure_services_initialized(self, exit_stack: AsyncExitStack) -> None: - """Ensure that the data store and event broker have been initialized.""" + """ + Initialize the data store and event broker if this hasn't already been done. + + """ if not self._services_initialized: self._services_initialized = True exit_stack.callback(setattr, self, "_services_initialized", False) @@ -132,6 +147,7 @@ class AsyncScheduler: await self.data_store.start(exit_stack, self.event_broker) def _check_initialized(self) -> None: + """Raise RuntimeError if the services have not been initialized yet.""" if not self._services_initialized: raise RuntimeError( "The scheduler has not been initialized yet. Use the scheduler as an " @@ -139,16 +155,6 @@ class AsyncScheduler: "than run_until_complete()." ) - async def _schedule_added_or_modified(self, event: Event) -> None: - event_ = cast("ScheduleAdded | ScheduleUpdated", event) - if not self._wakeup_deadline or ( - event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline - ): - self.logger.debug( - "Detected a %s event – waking up the scheduler", type(event).__name__ - ) - self._wakeup_event.set() - @property def state(self) -> RunState: """The current running state of the scheduler.""" @@ -157,7 +163,7 @@ class AsyncScheduler: def subscribe( self, callback: Callable[[Event], Any], - event_types: Iterable[type[Event]] | None = None, + event_types: type[Event] | Iterable[type[Event]] | None = None, *, one_shot: bool = False, is_async: bool = True, @@ -170,19 +176,43 @@ class AsyncScheduler: :param callback: callable to be called with the event object when an event is published - :param event_types: an iterable of concrete Event classes to subscribe to + :param event_types: an event class or an iterable event classes to subscribe to :param one_shot: if ``True``, automatically unsubscribe after the first matching event :param is_async: ``True`` if the (synchronous) callback should be called on the event loop thread, ``False`` if it should be called in a worker thread. - If the callback is a coroutine function, this flag is ignored. + If ``callback`` is a coroutine function, this flag is ignored. """ self._check_initialized() + if isclass(event_types): + event_types = {event_types} + return self.event_broker.subscribe( callback, event_types, is_async=is_async, one_shot=one_shot ) + async def get_next_event( + self, event_types: type[Event] | Iterable[type[Event]] + ) -> Event: + """ + Wait until the next event matching one of the given types arrives. + + :param event_types: an event class or an iterable event classes to subscribe to + + """ + received_event: Event + + def receive_event(ev: Event) -> None: + nonlocal received_event + received_event = ev + event.set() + + event = anyio.Event() + with self.subscribe(receive_event, event_types, one_shot=True): + await event.wait() + return received_event + async def add_schedule( self, func_or_task_id: str | Callable, @@ -433,9 +463,9 @@ class AsyncScheduler: For that, see :meth:`wait_until_stopped`. """ - if self._state is RunState.started: + if self._state is RunState.started and self._scheduler_cancel_scope: self._state = RunState.stopping - self._wakeup_event.set() + self._scheduler_cancel_scope.cancel() async def wait_until_stopped(self) -> None: """ @@ -446,22 +476,17 @@ class AsyncScheduler: ``SchedulerStopped`` event. """ - if self._state in (RunState.stopped, RunState.stopping): - return - - event = anyio.Event() - with self.event_broker.subscribe( - lambda ev: event.set(), {SchedulerStopped}, one_shot=True - ): - await event.wait() + if self._state not in (RunState.stopped, RunState.stopping): + await self.get_next_event(SchedulerStopped) async def start_in_background(self) -> None: self._check_initialized() - await self._task_group.start(self.run_until_stopped) + await self._services_task_group.start(self.run_until_stopped) async def run_until_stopped( self, *, task_status: TaskStatus = TASK_STATUS_IGNORED ) -> None: + """Run the scheduler until explicitly stopped.""" if self._state is not RunState.stopped: raise RuntimeError( f'Cannot start the scheduler when it is in the "{self._state}" ' @@ -470,150 +495,38 @@ class AsyncScheduler: self._state = RunState.starting async with AsyncExitStack() as exit_stack: - self._wakeup_event = anyio.Event() await self._ensure_services_initialized(exit_stack) - # Wake up the scheduler if the data store emits a significant schedule event - exit_stack.enter_context( - self.event_broker.subscribe( - self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} - ) - ) - # Set this scheduler as the current scheduler token = current_async_scheduler.set(self) exit_stack.callback(current_async_scheduler.reset, token) - # Start the built-in worker, if configured to do so - if self.process_jobs: - worker = Worker(job_executors=self.job_executors) - await worker.start( - exit_stack, self.data_store, self.event_broker, self.identity - ) - - # Signal that the scheduler has started - self._state = RunState.started - task_status.started() - await self.event_broker.publish_local(SchedulerStarted()) - exception: BaseException | None = None try: - while self._state is RunState.started: - schedules = await self.data_store.acquire_schedules( - self.identity, 100 - ) - now = datetime.now(timezone.utc) - for schedule in schedules: - # Calculate a next fire time for the schedule, if possible - fire_times = [schedule.next_fire_time] - calculate_next = schedule.trigger.next - while True: - try: - fire_time = calculate_next() - except Exception: - self.logger.exception( - "Error computing next fire time for schedule %r of " - "task %r – removing schedule", - schedule.id, - schedule.task_id, - ) - break - - # Stop if the calculated fire time is in the future - if fire_time is None or fire_time > now: - schedule.next_fire_time = fire_time - break - - # Only keep all the fire times if coalesce policy = "all" - if schedule.coalesce is CoalescePolicy.all: - fire_times.append(fire_time) - elif schedule.coalesce is CoalescePolicy.latest: - fire_times[0] = fire_time - - # Add one or more jobs to the job queue - max_jitter = ( - schedule.max_jitter.total_seconds() - if schedule.max_jitter - else 0 - ) - for i, fire_time in enumerate(fire_times): - # Calculate a jitter if max_jitter > 0 - jitter = _zero_timedelta - if max_jitter: - if i + 1 < len(fire_times): - next_fire_time = fire_times[i + 1] - else: - next_fire_time = schedule.next_fire_time - - if next_fire_time is not None: - # Jitter must never be so high that it would cause a - # fire time to equal or exceed the next fire time - jitter_s = min( - [ - max_jitter, - ( - next_fire_time - - fire_time - - _microsecond_delta - ).total_seconds(), - ] - ) - jitter = timedelta( - seconds=random.uniform(0, jitter_s) - ) - fire_time += jitter - - schedule.last_fire_time = fire_time - job = Job( - task_id=schedule.task_id, - args=schedule.args, - kwargs=schedule.kwargs, - schedule_id=schedule.id, - scheduled_fire_time=fire_time, - jitter=jitter, - start_deadline=schedule.next_deadline, - tags=schedule.tags, - ) - await self.data_store.add_job(job) - - # Update the schedules (and release the scheduler's claim on them) - await self.data_store.release_schedules(self.identity, schedules) - - # If we received fewer schedules than the maximum amount, sleep - # until the next schedule is due or the scheduler is explicitly - # woken up - wait_time = None - if len(schedules) < 100: - self._wakeup_deadline = ( - await self.data_store.get_next_schedule_run_time() - ) - if self._wakeup_deadline: - wait_time = ( - self._wakeup_deadline - datetime.now(timezone.utc) - ).total_seconds() - self.logger.debug( - "Sleeping %.3f seconds until the next fire time (%s)", - wait_time, - self._wakeup_deadline, - ) - else: - self.logger.debug("Waiting for any due schedules to appear") - - with move_on_after(wait_time): - await self._wakeup_event.wait() - self._wakeup_event = anyio.Event() - else: - self.logger.debug( - "Processing more schedules on the next iteration" - ) + async with create_task_group() as task_group: + self._scheduler_cancel_scope = task_group.cancel_scope + exit_stack.callback(setattr, self, "_scheduler_cancel_scope", None) + + # Start processing due schedules, if configured to do so + if self.role in (SchedulerRole.scheduler, SchedulerRole.both): + await task_group.start(self._process_schedules) + + # Start processing due jobs, if configured to do so + if self.role in (SchedulerRole.worker, SchedulerRole.both): + await task_group.start(self._process_jobs) + + # Signal that the scheduler has started + self._state = RunState.started + self.logger.info("Scheduler started") + task_status.started() + await self.event_broker.publish_local(SchedulerStarted()) except BaseException as exc: exception = exc raise finally: self._state = RunState.stopped - # CancelledError is a subclass of Exception in Python 3.7 - if not exception or isinstance(exception, CancelledError): + if not exception or isinstance(exception, get_cancelled_exc_class()): self.logger.info("Scheduler stopped") elif isinstance(exception, Exception): self.logger.exception("Scheduler crashed") @@ -626,3 +539,239 @@ class AsyncScheduler: await self.event_broker.publish_local( SchedulerStopped(exception=exception) ) + + async def _process_schedules(self, *, task_status: TaskStatus) -> None: + wakeup_event = anyio.Event() + wakeup_deadline: datetime | None = None + + async def schedule_added_or_modified(self, event: Event) -> None: + event_ = cast("ScheduleAdded | ScheduleUpdated", event) + if not wakeup_deadline or ( + event_.next_fire_time and event_.next_fire_time < wakeup_deadline + ): + self.logger.debug( + "Detected a %s event – waking up the scheduler to process " + "schedules", + type(event).__name__, + ) + wakeup_event.set() + + subscription = self.event_broker.subscribe( + schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated} + ) + with subscription: + # Signal that we are ready, and wait for the scheduler start event + task_status.started() + await self.get_next_event(SchedulerStarted) + + while self._state is RunState.started: + schedules = await self.data_store.acquire_schedules(self.identity, 100) + now = datetime.now(timezone.utc) + for schedule in schedules: + # Calculate a next fire time for the schedule, if possible + fire_times = [schedule.next_fire_time] + calculate_next = schedule.trigger.next + while True: + try: + fire_time = calculate_next() + except Exception: + self.logger.exception( + "Error computing next fire time for schedule %r of " + "task %r – removing schedule", + schedule.id, + schedule.task_id, + ) + break + + # Stop if the calculated fire time is in the future + if fire_time is None or fire_time > now: + schedule.next_fire_time = fire_time + break + + # Only keep all the fire times if coalesce policy = "all" + if schedule.coalesce is CoalescePolicy.all: + fire_times.append(fire_time) + elif schedule.coalesce is CoalescePolicy.latest: + fire_times[0] = fire_time + + # Add one or more jobs to the job queue + max_jitter = ( + schedule.max_jitter.total_seconds() + if schedule.max_jitter + else 0 + ) + for i, fire_time in enumerate(fire_times): + # Calculate a jitter if max_jitter > 0 + jitter = _zero_timedelta + if max_jitter: + if i + 1 < len(fire_times): + next_fire_time = fire_times[i + 1] + else: + next_fire_time = schedule.next_fire_time + + if next_fire_time is not None: + # Jitter must never be so high that it would cause a + # fire time to equal or exceed the next fire time + jitter_s = min( + [ + max_jitter, + ( + next_fire_time + - fire_time + - _microsecond_delta + ).total_seconds(), + ] + ) + jitter = timedelta(seconds=random.uniform(0, jitter_s)) + fire_time += jitter + + schedule.last_fire_time = fire_time + job = Job( + task_id=schedule.task_id, + args=schedule.args, + kwargs=schedule.kwargs, + schedule_id=schedule.id, + scheduled_fire_time=fire_time, + jitter=jitter, + start_deadline=schedule.next_deadline, + tags=schedule.tags, + ) + await self.data_store.add_job(job) + + # Update the schedules (and release the scheduler's claim on them) + await self.data_store.release_schedules(self.identity, schedules) + + # If we received fewer schedules than the maximum amount, sleep + # until the next schedule is due or the scheduler is explicitly + # woken up + wait_time = None + if len(schedules) < 100: + wakeup_deadline = await self.data_store.get_next_schedule_run_time() + if wakeup_deadline: + wait_time = ( + wakeup_deadline - datetime.now(timezone.utc) + ).total_seconds() + self.logger.debug( + "Sleeping %.3f seconds until the next fire time (%s)", + wait_time, + wakeup_deadline, + ) + else: + self.logger.debug("Waiting for any due schedules to appear") + + with move_on_after(wait_time): + await wakeup_event.wait() + wakeup_event = anyio.Event() + else: + self.logger.debug("Processing more schedules on the next iteration") + + async def _process_jobs(self, *, task_status: TaskStatus) -> None: + wakeup_event = anyio.Event() + + async def job_added(event: Event) -> None: + if len(self._running_jobs) < self.max_concurrent_jobs: + wakeup_event.set() + + async with AsyncExitStack() as exit_stack: + # Start the job executors + for job_executor in self.job_executors.values(): + await job_executor.start(exit_stack) + + task_group = await exit_stack.enter_async_context(create_task_group()) + + # Fetch new jobs every time + exit_stack.enter_context(self.event_broker.subscribe(job_added, {JobAdded})) + + # Signal that we are ready, and wait for the scheduler start event + task_status.started() + await self.get_next_event(SchedulerStarted) + + while self._state is RunState.started: + limit = self.max_concurrent_jobs - len(self._running_jobs) + if limit > 0: + jobs = await self.data_store.acquire_jobs(self.identity, limit) + for job in jobs: + task = await self.data_store.get_task(job.task_id) + self._running_jobs.add(job.id) + task_group.start_soon( + self._run_job, job, task.func, task.executor + ) + + await wakeup_event.wait() + wakeup_event = anyio.Event() + + async def _run_job(self, job: Job, func: Callable, executor: str) -> None: + try: + # Check if the job started before the deadline + start_time = datetime.now(timezone.utc) + if job.start_deadline is not None and start_time > job.start_deadline: + result = JobResult.from_job( + job, + outcome=JobOutcome.missed_start_deadline, + finished_at=start_time, + ) + await self.data_store.release_job(self.identity, job.task_id, result) + await self.event_broker.publish( + JobReleased.from_result(result, self.identity) + ) + return + + try: + job_executor = self.job_executors[executor] + except KeyError: + return + + token = current_job.set(JobInfo.from_job(job)) + try: + retval = await job_executor.run_job(func, job) + except get_cancelled_exc_class(): + self.logger.info("Job %s was cancelled", job.id) + with CancelScope(shield=True): + result = JobResult.from_job( + job, + outcome=JobOutcome.cancelled, + ) + await self.data_store.release_job( + self.identity, job.task_id, result + ) + await self.event_broker.publish( + JobReleased.from_result(result, self.identity) + ) + except BaseException as exc: + if isinstance(exc, Exception): + self.logger.exception("Job %s raised an exception", job.id) + else: + self.logger.error( + "Job %s was aborted due to %s", job.id, exc.__class__.__name__ + ) + + result = JobResult.from_job( + job, + JobOutcome.error, + exception=exc, + ) + await self.data_store.release_job( + self.identity, + job.task_id, + result, + ) + await self.event_broker.publish( + JobReleased.from_result(result, self.identity) + ) + if not isinstance(exc, Exception): + raise + else: + self.logger.info("Job %s completed successfully", job.id) + result = JobResult.from_job( + job, + JobOutcome.success, + return_value=retval, + ) + await self.data_store.release_job(self.identity, job.task_id, result) + await self.event_broker.publish( + JobReleased.from_result(result, self.identity) + ) + finally: + current_job.reset(token) + finally: + self._running_jobs.remove(job.id) diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 3a812a4..c80099f 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -17,7 +17,7 @@ from anyio import start_blocking_portal from anyio.from_thread import BlockingPortal from .. import Event, current_scheduler -from .._enums import CoalescePolicy, ConflictPolicy, RunState +from .._enums import CoalescePolicy, ConflictPolicy, RunState, SchedulerRole from .._structures import JobResult, Schedule from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger from .async_ import AsyncScheduler @@ -37,8 +37,7 @@ class Scheduler: event_broker: EventBroker | None = None, *, identity: str | None = None, - process_schedules: bool = True, - start_worker: bool = True, + role: SchedulerRole = SchedulerRole.both, job_executors: Mapping[str, JobExecutor] | None = None, default_job_executor: str | None = None, logger: Logger | None = None, @@ -54,8 +53,7 @@ class Scheduler: self._async_scheduler = AsyncScheduler( identity=identity, - process_schedules=process_schedules, - process_jobs=start_worker, + role=role, job_executors=job_executors, default_job_executor=default_job_executor, logger=logger or logging.getLogger(__name__), @@ -78,12 +76,8 @@ class Scheduler: return self._async_scheduler.identity @property - def process_schedules(self) -> bool: - return self._async_scheduler.process_schedules - - @property - def start_worker(self) -> bool: - return self._async_scheduler.process_jobs + def role(self) -> SchedulerRole: + return self._async_scheduler.role @property def job_executors(self) -> MutableMapping[str, JobExecutor]: diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 30c8dc9..58a1eae 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -30,6 +30,7 @@ from apscheduler import ( current_job, current_scheduler, ) +from apscheduler._enums import SchedulerRole from apscheduler.schedulers.async_ import AsyncScheduler from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.date import DateTrigger @@ -62,6 +63,7 @@ def dummy_sync_job(delay: float = 0, fail: bool = False) -> str: class TestAsyncScheduler: async def test_schedule_job(self) -> None: def listener(received_event: Event) -> None: + print(received_event) received_events.append(received_event) if isinstance(received_event, ScheduleRemoved): event.set() @@ -69,7 +71,7 @@ class TestAsyncScheduler: received_events: list[Event] = [] event = anyio.Event() trigger = DateTrigger(datetime.now(timezone.utc)) - async with AsyncScheduler(process_jobs=False) as scheduler: + async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler: scheduler.event_broker.subscribe(listener) await scheduler.add_schedule(dummy_async_job, trigger, id="foo") await scheduler.start_in_background() @@ -110,7 +112,7 @@ class TestAsyncScheduler: assert not received_events async def test_add_get_schedule(self) -> None: - async with AsyncScheduler(process_jobs=False) as scheduler: + async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler: with pytest.raises(ScheduleLookupError): await scheduler.get_schedule("dummyid") @@ -120,7 +122,7 @@ class TestAsyncScheduler: assert isinstance(schedule, Schedule) async def test_add_get_schedules(self) -> None: - async with AsyncScheduler(process_jobs=False) as scheduler: + async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler: assert await scheduler.get_schedules() == [] schedule1_id = await scheduler.add_schedule( @@ -160,7 +162,7 @@ class TestAsyncScheduler: orig_start_time = datetime.now(timezone) - timedelta(seconds=1) fake_uniform = mocker.patch("random.uniform") fake_uniform.configure_mock(side_effect=lambda a, b: jitter) - async with AsyncScheduler(process_jobs=False) as scheduler: + async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler: trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) job_added_event = anyio.Event() scheduler.event_broker.subscribe(job_added_listener, {JobAdded}) @@ -315,7 +317,7 @@ class TestSyncScheduler: received_events: list[Event] = [] event = threading.Event() trigger = DateTrigger(datetime.now(timezone.utc)) - with Scheduler(start_worker=False) as scheduler: + with Scheduler(role=SchedulerRole.scheduler) as scheduler: scheduler.event_broker.subscribe(listener) scheduler.add_schedule(dummy_sync_job, trigger, id="foo") scheduler.start_in_background() @@ -351,17 +353,17 @@ class TestSyncScheduler: assert isinstance(received_event, SchedulerStopped) def test_add_get_schedule(self) -> None: - with Scheduler(start_worker=False) as scheduler: + with Scheduler(role=SchedulerRole.scheduler) as scheduler: with pytest.raises(ScheduleLookupError): scheduler.get_schedule("dummyid") trigger = DateTrigger(datetime.now(timezone.utc)) - scheduler.add_schedule(dummy_async_job, trigger, id="dummyid") + scheduler.add_schedule(dummy_sync_job, trigger, id="dummyid") schedule = scheduler.get_schedule("dummyid") assert isinstance(schedule, Schedule) def test_add_get_schedules(self) -> None: - with Scheduler(start_worker=False) as scheduler: + with Scheduler(role=SchedulerRole.scheduler) as scheduler: assert scheduler.get_schedules() == [] schedule1_id = scheduler.add_schedule( @@ -399,12 +401,12 @@ class TestSyncScheduler: orig_start_time = datetime.now(timezone) - timedelta(seconds=1) fake_uniform = mocker.patch("random.uniform") fake_uniform.configure_mock(side_effect=lambda a, b: jitter) - with Scheduler(start_worker=False) as scheduler: + with Scheduler(role=SchedulerRole.scheduler) as scheduler: trigger = IntervalTrigger(seconds=3, start_time=orig_start_time) job_added_event = threading.Event() scheduler.event_broker.subscribe(job_added_listener, {JobAdded}) schedule_id = scheduler.add_schedule( - dummy_async_job, trigger, max_jitter=max_jitter + dummy_sync_job, trigger, max_jitter=max_jitter ) schedule = scheduler.get_schedule(schedule_id) assert schedule.max_jitter == timedelta(seconds=max_jitter) |