summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-09-21 01:56:32 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-09-21 02:40:02 +0300
commite8055bce55bb004168b5787f89091057cf1f36c7 (patch)
treec8c377a4a8301e643efc939cea86138b8d72e95d
parentc5727432736b55b7d76753307f14efdb962c2edf (diff)
downloadapscheduler-e8055bce55bb004168b5787f89091057cf1f36c7.tar.gz
Merged the Worker class into AsyncScheduler
-rw-r--r--docs/versionhistory.rst8
-rw-r--r--src/apscheduler/__init__.py9
-rw-r--r--src/apscheduler/_enums.py16
-rw-r--r--src/apscheduler/_events.py30
-rw-r--r--src/apscheduler/_validators.py6
-rw-r--r--src/apscheduler/_worker.py189
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py3
-rw-r--r--src/apscheduler/schedulers/async_.py507
-rw-r--r--src/apscheduler/schedulers/sync.py16
-rw-r--r--tests/test_schedulers.py22
10 files changed, 372 insertions, 434 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 559ee36..e86d486 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -6,9 +6,11 @@ APScheduler, see the :doc:`migration section <migration>`.
**UNRELEASED**
-- **BREAKING** Workers can no longer be run independently. Instead, you can run a
- scheduler that only starts a worker but does not process schedules by passing
- ``process_schedules=False`` to the scheduler
+- **BREAKING** Workers were merged into schedulers. As the ``Worker`` and
+ ``AsyncWorker`` classes have been removed, you now need to pass
+ ``role=SchedulerRole.scheduler`` to the scheduler to prevent it from processing due
+ jobs. The worker event classes (``WorkerEvent``, ``WorkerStarted``, ``WorkerStopped``)
+ have also been removed.
- **BREAKING** The synchronous interfaces for event brokers and data stores have been
removed. Synchronous libraries can still be used to implement these services through
the use of ``anyio.to_thread.run_sync()``.
diff --git a/src/apscheduler/__init__.py b/src/apscheduler/__init__.py
index 844a056..ddf34d9 100644
--- a/src/apscheduler/__init__.py
+++ b/src/apscheduler/__init__.py
@@ -31,6 +31,7 @@ __all__ = [
"ScheduleRemoved",
"ScheduleDeserializationFailed",
"SchedulerEvent",
+ "SchedulerRole",
"SchedulerStarted",
"SchedulerStopped",
"Task",
@@ -38,9 +39,6 @@ __all__ = [
"TaskLookupError",
"TaskUpdated",
"TaskRemoved",
- "WorkerEvent",
- "WorkerStarted",
- "WorkerStopped",
"current_async_scheduler",
"current_scheduler",
"current_job",
@@ -49,7 +47,7 @@ __all__ = [
from typing import Any
from ._context import current_async_scheduler, current_job, current_scheduler
-from ._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
+from ._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole
from ._events import (
DataStoreEvent,
Event,
@@ -68,9 +66,6 @@ from ._events import (
TaskAdded,
TaskRemoved,
TaskUpdated,
- WorkerEvent,
- WorkerStarted,
- WorkerStopped,
)
from ._exceptions import (
ConflictingIdError,
diff --git a/src/apscheduler/_enums.py b/src/apscheduler/_enums.py
index d8e706d..0125f2a 100644
--- a/src/apscheduler/_enums.py
+++ b/src/apscheduler/_enums.py
@@ -3,6 +3,22 @@ from __future__ import annotations
from enum import Enum, auto
+class SchedulerRole(Enum):
+ """
+ Specifies what the scheduler should be doing when it's running.
+
+ Values:
+
+ * ``scheduler``: processes due schedules, but won't run jobs
+ * ``worker``: runs due jobs, but won't process schedules
+ * ``both``: processes schedules and runs due jobs
+ """
+
+ scheduler = auto()
+ worker = auto()
+ both = auto()
+
+
class RunState(Enum):
"""
Used to track the running state of schedulers and workers.
diff --git a/src/apscheduler/_events.py b/src/apscheduler/_events.py
index a3bd297..9d0278d 100644
--- a/src/apscheduler/_events.py
+++ b/src/apscheduler/_events.py
@@ -206,34 +206,8 @@ class SchedulerStopped(SchedulerEvent):
exception: BaseException | None = None
-#
-# Worker events
-#
-
-
-@attrs.define(kw_only=True, frozen=True)
-class WorkerEvent(Event):
- """Base class for events originating from a worker."""
-
-
-@attrs.define(kw_only=True, frozen=True)
-class WorkerStarted(WorkerEvent):
- """Signals that a worker has started."""
-
-
-@attrs.define(kw_only=True, frozen=True)
-class WorkerStopped(WorkerEvent):
- """
- Signals that a worker has stopped.
-
- :ivar exception: the exception that caused the worker to stop, if any
- """
-
- exception: BaseException | None = None
-
-
@attrs.define(kw_only=True, frozen=True)
-class JobAcquired(WorkerEvent):
+class JobAcquired(SchedulerEvent):
"""
Signals that a worker has acquired a job for processing.
@@ -246,7 +220,7 @@ class JobAcquired(WorkerEvent):
@attrs.define(kw_only=True, frozen=True)
-class JobReleased(WorkerEvent):
+class JobReleased(SchedulerEvent):
"""
Signals that a worker has finished processing of a job.
diff --git a/src/apscheduler/_validators.py b/src/apscheduler/_validators.py
index 4197edf..eebb3c0 100644
--- a/src/apscheduler/_validators.py
+++ b/src/apscheduler/_validators.py
@@ -4,7 +4,6 @@ import sys
from datetime import date, datetime, timedelta, timezone, tzinfo
from typing import Any
-import attrs
from attrs import Attribute
from tzlocal import get_localzone
@@ -179,8 +178,3 @@ def require_state_version(
raise DeserializationError(
'Missing "version" key in the serialized state'
) from exc
-
-
-def positive_integer(inst, field: attrs.Attribute, value) -> None:
- if value <= 0:
- raise ValueError(f"{field} must be a positive integer")
diff --git a/src/apscheduler/_worker.py b/src/apscheduler/_worker.py
deleted file mode 100644
index 8c95d16..0000000
--- a/src/apscheduler/_worker.py
+++ /dev/null
@@ -1,189 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Mapping
-from contextlib import AsyncExitStack
-from datetime import datetime, timezone
-from logging import Logger, getLogger
-from typing import Callable
-from uuid import UUID
-
-import anyio
-import attrs
-from anyio import create_task_group, get_cancelled_exc_class, move_on_after
-from anyio.abc import CancelScope
-
-from ._context import current_job
-from ._enums import JobOutcome, RunState
-from ._events import JobAdded, JobReleased, WorkerStarted, WorkerStopped
-from ._structures import Job, JobInfo, JobResult
-from ._validators import positive_integer
-from .abc import DataStore, EventBroker, JobExecutor
-
-
-@attrs.define(eq=False, kw_only=True)
-class Worker:
- """
- Runs jobs locally in a task group.
-
- :param max_concurrent_jobs: Maximum number of jobs the worker will run at once
- """
-
- job_executors: Mapping[str, JobExecutor] = attrs.field(kw_only=True)
- max_concurrent_jobs: int = attrs.field(
- kw_only=True, validator=positive_integer, default=100
- )
- logger: Logger = attrs.field(kw_only=True, default=getLogger(__name__))
-
- _data_store: DataStore = attrs.field(init=False)
- _event_broker: EventBroker = attrs.field(init=False)
- _identity: str = attrs.field(init=False)
- _state: RunState = attrs.field(init=False, default=RunState.stopped)
- _wakeup_event: anyio.Event = attrs.field(init=False)
- _acquired_jobs: set[Job] = attrs.field(init=False, factory=set)
- _running_jobs: set[UUID] = attrs.field(init=False, factory=set)
-
- async def start(
- self,
- exit_stack: AsyncExitStack,
- data_store: DataStore,
- event_broker: EventBroker,
- identity: str,
- ) -> None:
- self._data_store = data_store
- self._event_broker = event_broker
- self._identity = identity
- self._state = RunState.started
- self._wakeup_event = anyio.Event()
-
- # Start the job executors
- for job_executor in self.job_executors.values():
- await job_executor.start(exit_stack)
-
- # Start the worker in a background task
- task_group = await exit_stack.enter_async_context(create_task_group())
- task_group.start_soon(self._run)
-
- # Stop the worker when the exit stack unwinds
- exit_stack.callback(lambda: self._wakeup_event.set())
- exit_stack.callback(setattr, self, "_state", RunState.stopped)
-
- # Wake up the worker if the data store emits a significant job event
- exit_stack.enter_context(
- self._event_broker.subscribe(
- lambda event: self._wakeup_event.set(), {JobAdded}
- )
- )
-
- # Signal that the worker has started
- await self._event_broker.publish_local(WorkerStarted())
-
- async def _run(self) -> None:
- """Run the worker until it is explicitly stopped."""
- exception: BaseException | None = None
- try:
- async with create_task_group() as tg:
- while self._state is RunState.started:
- limit = self.max_concurrent_jobs - len(self._running_jobs)
- jobs = await self._data_store.acquire_jobs(self._identity, limit)
- for job in jobs:
- task = await self._data_store.get_task(job.task_id)
- self._running_jobs.add(job.id)
- tg.start_soon(self._run_job, job, task.func, task.executor)
-
- await self._wakeup_event.wait()
- self._wakeup_event = anyio.Event()
- except get_cancelled_exc_class():
- pass
- except BaseException as exc:
- exception = exc
- raise
- finally:
- if not exception:
- self.logger.info("Worker stopped")
- elif isinstance(exception, Exception):
- self.logger.exception("Worker crashed")
- elif exception:
- self.logger.info(
- f"Worker stopped due to {exception.__class__.__name__}"
- )
-
- with move_on_after(3, shield=True):
- await self._event_broker.publish_local(
- WorkerStopped(exception=exception)
- )
-
- async def _run_job(self, job: Job, func: Callable, executor: str) -> None:
- try:
- # Check if the job started before the deadline
- start_time = datetime.now(timezone.utc)
- if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult.from_job(
- job,
- outcome=JobOutcome.missed_start_deadline,
- finished_at=start_time,
- )
- await self._data_store.release_job(self._identity, job.task_id, result)
- await self._event_broker.publish(
- JobReleased.from_result(result, self._identity)
- )
- return
-
- try:
- job_executor = self.job_executors[executor]
- except KeyError:
- return
-
- token = current_job.set(JobInfo.from_job(job))
- try:
- retval = await job_executor.run_job(func, job)
- except get_cancelled_exc_class():
- self.logger.info("Job %s was cancelled", job.id)
- with CancelScope(shield=True):
- result = JobResult.from_job(
- job,
- outcome=JobOutcome.cancelled,
- )
- await self._data_store.release_job(
- self._identity, job.task_id, result
- )
- await self._event_broker.publish(
- JobReleased.from_result(result, self._identity)
- )
- except BaseException as exc:
- if isinstance(exc, Exception):
- self.logger.exception("Job %s raised an exception", job.id)
- else:
- self.logger.error(
- "Job %s was aborted due to %s", job.id, exc.__class__.__name__
- )
-
- result = JobResult.from_job(
- job,
- JobOutcome.error,
- exception=exc,
- )
- await self._data_store.release_job(
- self._identity,
- job.task_id,
- result,
- )
- await self._event_broker.publish(
- JobReleased.from_result(result, self._identity)
- )
- if not isinstance(exc, Exception):
- raise
- else:
- self.logger.info("Job %s completed successfully", job.id)
- result = JobResult.from_job(
- job,
- JobOutcome.success,
- return_value=retval,
- )
- await self._data_store.release_job(self._identity, job.task_id, result)
- await self._event_broker.publish(
- JobReleased.from_result(result, self._identity)
- )
- finally:
- current_job.reset(token)
- finally:
- self._running_jobs.remove(job.id)
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 00d06aa..2322205 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -33,6 +33,7 @@ from sqlalchemy.engine import URL, Dialect, Result
from sqlalchemy.exc import CompileError, IntegrityError, InterfaceError
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
from sqlalchemy.future import Connection, Engine
+from sqlalchemy.sql import ClauseElement, Executable
from sqlalchemy.sql.ddl import DropTable
from sqlalchemy.sql.elements import BindParameter, literal
@@ -197,7 +198,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
async def _execute(
self,
conn: Connection | AsyncConnection,
- statement,
+ statement: ClauseElement | Executable,
parameters: Sequence | None = None,
):
if isinstance(conn, AsyncConnection):
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 44fee27..11a1c67 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -4,10 +4,10 @@ import os
import platform
import random
import sys
-from asyncio import CancelledError
from collections.abc import MutableMapping
from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
+from inspect import isclass
from logging import Logger, getLogger
from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, cast
@@ -15,12 +15,19 @@ from uuid import UUID, uuid4
import anyio
import attrs
-from anyio import TASK_STATUS_IGNORED, create_task_group, move_on_after
+from anyio import (
+ TASK_STATUS_IGNORED,
+ CancelScope,
+ create_task_group,
+ get_cancelled_exc_class,
+ move_on_after,
+)
from anyio.abc import TaskGroup, TaskStatus
from attr.validators import instance_of
-from .._context import current_async_scheduler
-from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
+from .. import JobAdded
+from .._context import current_async_scheduler, current_job
+from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole
from .._events import (
Event,
JobReleased,
@@ -35,8 +42,8 @@ from .._exceptions import (
JobLookupError,
ScheduleLookupError,
)
-from .._structures import Job, JobResult, Schedule, Task
-from .._worker import Worker
+from .._structures import Job, JobInfo, JobResult, Schedule, Task
+from .._validators import non_negative_number
from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger
from ..datastores.memory import MemoryDataStore
from ..eventbrokers.local import LocalEventBroker
@@ -54,32 +61,37 @@ _microsecond_delta = timedelta(microseconds=1)
_zero_timedelta = timedelta()
-@attrs.define(eq=False)
+@attrs.define(eq=False, kw_only=True)
class AsyncScheduler:
- """An asynchronous (AnyIO based) scheduler implementation."""
+ """
+ An asynchronous (AnyIO based) scheduler implementation.
+
+ :param data_store: the data store for tasks, schedules and jobs
+ :param event_broker: the event broker to use for publishing an subscribing events
+ :param max_concurrent_jobs: Maximum number of jobs the worker will run at once
+ :param role: specifies what the scheduler should be doing when running
+ :param process_schedules: ``True`` to process due schedules in this scheduler
+ """
data_store: DataStore = attrs.field(
- validator=instance_of(DataStore), factory=MemoryDataStore
+ kw_only=False, validator=instance_of(DataStore), factory=MemoryDataStore
)
event_broker: EventBroker = attrs.field(
- validator=instance_of(EventBroker), factory=LocalEventBroker
- )
- identity: str = attrs.field(kw_only=True, default=None)
- process_jobs: bool = attrs.field(kw_only=True, default=True)
- job_executors: MutableMapping[str, JobExecutor] | None = attrs.field(
- kw_only=True, default=None
+ kw_only=False, validator=instance_of(EventBroker), factory=LocalEventBroker
)
- default_job_executor: str | None = attrs.field(kw_only=True, default=None)
- process_schedules: bool = attrs.field(kw_only=True, default=True)
- logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
+ identity: str = attrs.field(default=None)
+ role: SchedulerRole = attrs.field(default=SchedulerRole.both)
+ max_concurrent_jobs: int = attrs.field(validator=non_negative_number, default=100)
+ job_executors: MutableMapping[str, JobExecutor] | None = attrs.field(default=None)
+ default_job_executor: str | None = attrs.field(default=None)
+ logger: Logger | None = attrs.field(default=getLogger(__name__))
_state: RunState = attrs.field(init=False, default=RunState.stopped)
- _task_group: TaskGroup | None = attrs.field(init=False, default=None)
+ _services_task_group: TaskGroup | None = attrs.field(init=False, default=None)
_exit_stack: AsyncExitStack = attrs.field(init=False, factory=AsyncExitStack)
_services_initialized: bool = attrs.field(init=False, default=False)
- _wakeup_event: anyio.Event = attrs.field(init=False)
- _wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
- _schedule_added_subscription: Subscription = attrs.field(init=False)
+ _scheduler_cancel_scope: CancelScope | None = attrs.field(init=False, default=None)
+ _running_jobs: set[Job] = attrs.field(init=False, factory=set)
def __attrs_post_init__(self) -> None:
if not self.identity:
@@ -103,10 +115,10 @@ class AsyncScheduler:
await self._exit_stack.__aenter__()
try:
await self._ensure_services_initialized(self._exit_stack)
- self._task_group = await self._exit_stack.enter_async_context(
+ self._services_task_group = await self._exit_stack.enter_async_context(
create_task_group()
)
- self._exit_stack.callback(setattr, self, "_task_group", None)
+ self._exit_stack.callback(setattr, self, "_services_task_group", None)
except BaseException as exc:
await self._exit_stack.__aexit__(type(exc), exc, exc.__traceback__)
raise
@@ -123,7 +135,10 @@ class AsyncScheduler:
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
async def _ensure_services_initialized(self, exit_stack: AsyncExitStack) -> None:
- """Ensure that the data store and event broker have been initialized."""
+ """
+ Initialize the data store and event broker if this hasn't already been done.
+
+ """
if not self._services_initialized:
self._services_initialized = True
exit_stack.callback(setattr, self, "_services_initialized", False)
@@ -132,6 +147,7 @@ class AsyncScheduler:
await self.data_store.start(exit_stack, self.event_broker)
def _check_initialized(self) -> None:
+ """Raise RuntimeError if the services have not been initialized yet."""
if not self._services_initialized:
raise RuntimeError(
"The scheduler has not been initialized yet. Use the scheduler as an "
@@ -139,16 +155,6 @@ class AsyncScheduler:
"than run_until_complete()."
)
- async def _schedule_added_or_modified(self, event: Event) -> None:
- event_ = cast("ScheduleAdded | ScheduleUpdated", event)
- if not self._wakeup_deadline or (
- event_.next_fire_time and event_.next_fire_time < self._wakeup_deadline
- ):
- self.logger.debug(
- "Detected a %s event – waking up the scheduler", type(event).__name__
- )
- self._wakeup_event.set()
-
@property
def state(self) -> RunState:
"""The current running state of the scheduler."""
@@ -157,7 +163,7 @@ class AsyncScheduler:
def subscribe(
self,
callback: Callable[[Event], Any],
- event_types: Iterable[type[Event]] | None = None,
+ event_types: type[Event] | Iterable[type[Event]] | None = None,
*,
one_shot: bool = False,
is_async: bool = True,
@@ -170,19 +176,43 @@ class AsyncScheduler:
:param callback: callable to be called with the event object when an event is
published
- :param event_types: an iterable of concrete Event classes to subscribe to
+ :param event_types: an event class or an iterable event classes to subscribe to
:param one_shot: if ``True``, automatically unsubscribe after the first matching
event
:param is_async: ``True`` if the (synchronous) callback should be called on the
event loop thread, ``False`` if it should be called in a worker thread.
- If the callback is a coroutine function, this flag is ignored.
+ If ``callback`` is a coroutine function, this flag is ignored.
"""
self._check_initialized()
+ if isclass(event_types):
+ event_types = {event_types}
+
return self.event_broker.subscribe(
callback, event_types, is_async=is_async, one_shot=one_shot
)
+ async def get_next_event(
+ self, event_types: type[Event] | Iterable[type[Event]]
+ ) -> Event:
+ """
+ Wait until the next event matching one of the given types arrives.
+
+ :param event_types: an event class or an iterable event classes to subscribe to
+
+ """
+ received_event: Event
+
+ def receive_event(ev: Event) -> None:
+ nonlocal received_event
+ received_event = ev
+ event.set()
+
+ event = anyio.Event()
+ with self.subscribe(receive_event, event_types, one_shot=True):
+ await event.wait()
+ return received_event
+
async def add_schedule(
self,
func_or_task_id: str | Callable,
@@ -433,9 +463,9 @@ class AsyncScheduler:
For that, see :meth:`wait_until_stopped`.
"""
- if self._state is RunState.started:
+ if self._state is RunState.started and self._scheduler_cancel_scope:
self._state = RunState.stopping
- self._wakeup_event.set()
+ self._scheduler_cancel_scope.cancel()
async def wait_until_stopped(self) -> None:
"""
@@ -446,22 +476,17 @@ class AsyncScheduler:
``SchedulerStopped`` event.
"""
- if self._state in (RunState.stopped, RunState.stopping):
- return
-
- event = anyio.Event()
- with self.event_broker.subscribe(
- lambda ev: event.set(), {SchedulerStopped}, one_shot=True
- ):
- await event.wait()
+ if self._state not in (RunState.stopped, RunState.stopping):
+ await self.get_next_event(SchedulerStopped)
async def start_in_background(self) -> None:
self._check_initialized()
- await self._task_group.start(self.run_until_stopped)
+ await self._services_task_group.start(self.run_until_stopped)
async def run_until_stopped(
self, *, task_status: TaskStatus = TASK_STATUS_IGNORED
) -> None:
+ """Run the scheduler until explicitly stopped."""
if self._state is not RunState.stopped:
raise RuntimeError(
f'Cannot start the scheduler when it is in the "{self._state}" '
@@ -470,150 +495,38 @@ class AsyncScheduler:
self._state = RunState.starting
async with AsyncExitStack() as exit_stack:
- self._wakeup_event = anyio.Event()
await self._ensure_services_initialized(exit_stack)
- # Wake up the scheduler if the data store emits a significant schedule event
- exit_stack.enter_context(
- self.event_broker.subscribe(
- self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
- )
- )
-
# Set this scheduler as the current scheduler
token = current_async_scheduler.set(self)
exit_stack.callback(current_async_scheduler.reset, token)
- # Start the built-in worker, if configured to do so
- if self.process_jobs:
- worker = Worker(job_executors=self.job_executors)
- await worker.start(
- exit_stack, self.data_store, self.event_broker, self.identity
- )
-
- # Signal that the scheduler has started
- self._state = RunState.started
- task_status.started()
- await self.event_broker.publish_local(SchedulerStarted())
-
exception: BaseException | None = None
try:
- while self._state is RunState.started:
- schedules = await self.data_store.acquire_schedules(
- self.identity, 100
- )
- now = datetime.now(timezone.utc)
- for schedule in schedules:
- # Calculate a next fire time for the schedule, if possible
- fire_times = [schedule.next_fire_time]
- calculate_next = schedule.trigger.next
- while True:
- try:
- fire_time = calculate_next()
- except Exception:
- self.logger.exception(
- "Error computing next fire time for schedule %r of "
- "task %r – removing schedule",
- schedule.id,
- schedule.task_id,
- )
- break
-
- # Stop if the calculated fire time is in the future
- if fire_time is None or fire_time > now:
- schedule.next_fire_time = fire_time
- break
-
- # Only keep all the fire times if coalesce policy = "all"
- if schedule.coalesce is CoalescePolicy.all:
- fire_times.append(fire_time)
- elif schedule.coalesce is CoalescePolicy.latest:
- fire_times[0] = fire_time
-
- # Add one or more jobs to the job queue
- max_jitter = (
- schedule.max_jitter.total_seconds()
- if schedule.max_jitter
- else 0
- )
- for i, fire_time in enumerate(fire_times):
- # Calculate a jitter if max_jitter > 0
- jitter = _zero_timedelta
- if max_jitter:
- if i + 1 < len(fire_times):
- next_fire_time = fire_times[i + 1]
- else:
- next_fire_time = schedule.next_fire_time
-
- if next_fire_time is not None:
- # Jitter must never be so high that it would cause a
- # fire time to equal or exceed the next fire time
- jitter_s = min(
- [
- max_jitter,
- (
- next_fire_time
- - fire_time
- - _microsecond_delta
- ).total_seconds(),
- ]
- )
- jitter = timedelta(
- seconds=random.uniform(0, jitter_s)
- )
- fire_time += jitter
-
- schedule.last_fire_time = fire_time
- job = Job(
- task_id=schedule.task_id,
- args=schedule.args,
- kwargs=schedule.kwargs,
- schedule_id=schedule.id,
- scheduled_fire_time=fire_time,
- jitter=jitter,
- start_deadline=schedule.next_deadline,
- tags=schedule.tags,
- )
- await self.data_store.add_job(job)
-
- # Update the schedules (and release the scheduler's claim on them)
- await self.data_store.release_schedules(self.identity, schedules)
-
- # If we received fewer schedules than the maximum amount, sleep
- # until the next schedule is due or the scheduler is explicitly
- # woken up
- wait_time = None
- if len(schedules) < 100:
- self._wakeup_deadline = (
- await self.data_store.get_next_schedule_run_time()
- )
- if self._wakeup_deadline:
- wait_time = (
- self._wakeup_deadline - datetime.now(timezone.utc)
- ).total_seconds()
- self.logger.debug(
- "Sleeping %.3f seconds until the next fire time (%s)",
- wait_time,
- self._wakeup_deadline,
- )
- else:
- self.logger.debug("Waiting for any due schedules to appear")
-
- with move_on_after(wait_time):
- await self._wakeup_event.wait()
- self._wakeup_event = anyio.Event()
- else:
- self.logger.debug(
- "Processing more schedules on the next iteration"
- )
+ async with create_task_group() as task_group:
+ self._scheduler_cancel_scope = task_group.cancel_scope
+ exit_stack.callback(setattr, self, "_scheduler_cancel_scope", None)
+
+ # Start processing due schedules, if configured to do so
+ if self.role in (SchedulerRole.scheduler, SchedulerRole.both):
+ await task_group.start(self._process_schedules)
+
+ # Start processing due jobs, if configured to do so
+ if self.role in (SchedulerRole.worker, SchedulerRole.both):
+ await task_group.start(self._process_jobs)
+
+ # Signal that the scheduler has started
+ self._state = RunState.started
+ self.logger.info("Scheduler started")
+ task_status.started()
+ await self.event_broker.publish_local(SchedulerStarted())
except BaseException as exc:
exception = exc
raise
finally:
self._state = RunState.stopped
- # CancelledError is a subclass of Exception in Python 3.7
- if not exception or isinstance(exception, CancelledError):
+ if not exception or isinstance(exception, get_cancelled_exc_class()):
self.logger.info("Scheduler stopped")
elif isinstance(exception, Exception):
self.logger.exception("Scheduler crashed")
@@ -626,3 +539,239 @@ class AsyncScheduler:
await self.event_broker.publish_local(
SchedulerStopped(exception=exception)
)
+
+ async def _process_schedules(self, *, task_status: TaskStatus) -> None:
+ wakeup_event = anyio.Event()
+ wakeup_deadline: datetime | None = None
+
+ async def schedule_added_or_modified(self, event: Event) -> None:
+ event_ = cast("ScheduleAdded | ScheduleUpdated", event)
+ if not wakeup_deadline or (
+ event_.next_fire_time and event_.next_fire_time < wakeup_deadline
+ ):
+ self.logger.debug(
+ "Detected a %s event – waking up the scheduler to process "
+ "schedules",
+ type(event).__name__,
+ )
+ wakeup_event.set()
+
+ subscription = self.event_broker.subscribe(
+ schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
+ )
+ with subscription:
+ # Signal that we are ready, and wait for the scheduler start event
+ task_status.started()
+ await self.get_next_event(SchedulerStarted)
+
+ while self._state is RunState.started:
+ schedules = await self.data_store.acquire_schedules(self.identity, 100)
+ now = datetime.now(timezone.utc)
+ for schedule in schedules:
+ # Calculate a next fire time for the schedule, if possible
+ fire_times = [schedule.next_fire_time]
+ calculate_next = schedule.trigger.next
+ while True:
+ try:
+ fire_time = calculate_next()
+ except Exception:
+ self.logger.exception(
+ "Error computing next fire time for schedule %r of "
+ "task %r – removing schedule",
+ schedule.id,
+ schedule.task_id,
+ )
+ break
+
+ # Stop if the calculated fire time is in the future
+ if fire_time is None or fire_time > now:
+ schedule.next_fire_time = fire_time
+ break
+
+ # Only keep all the fire times if coalesce policy = "all"
+ if schedule.coalesce is CoalescePolicy.all:
+ fire_times.append(fire_time)
+ elif schedule.coalesce is CoalescePolicy.latest:
+ fire_times[0] = fire_time
+
+ # Add one or more jobs to the job queue
+ max_jitter = (
+ schedule.max_jitter.total_seconds()
+ if schedule.max_jitter
+ else 0
+ )
+ for i, fire_time in enumerate(fire_times):
+ # Calculate a jitter if max_jitter > 0
+ jitter = _zero_timedelta
+ if max_jitter:
+ if i + 1 < len(fire_times):
+ next_fire_time = fire_times[i + 1]
+ else:
+ next_fire_time = schedule.next_fire_time
+
+ if next_fire_time is not None:
+ # Jitter must never be so high that it would cause a
+ # fire time to equal or exceed the next fire time
+ jitter_s = min(
+ [
+ max_jitter,
+ (
+ next_fire_time
+ - fire_time
+ - _microsecond_delta
+ ).total_seconds(),
+ ]
+ )
+ jitter = timedelta(seconds=random.uniform(0, jitter_s))
+ fire_time += jitter
+
+ schedule.last_fire_time = fire_time
+ job = Job(
+ task_id=schedule.task_id,
+ args=schedule.args,
+ kwargs=schedule.kwargs,
+ schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ jitter=jitter,
+ start_deadline=schedule.next_deadline,
+ tags=schedule.tags,
+ )
+ await self.data_store.add_job(job)
+
+ # Update the schedules (and release the scheduler's claim on them)
+ await self.data_store.release_schedules(self.identity, schedules)
+
+ # If we received fewer schedules than the maximum amount, sleep
+ # until the next schedule is due or the scheduler is explicitly
+ # woken up
+ wait_time = None
+ if len(schedules) < 100:
+ wakeup_deadline = await self.data_store.get_next_schedule_run_time()
+ if wakeup_deadline:
+ wait_time = (
+ wakeup_deadline - datetime.now(timezone.utc)
+ ).total_seconds()
+ self.logger.debug(
+ "Sleeping %.3f seconds until the next fire time (%s)",
+ wait_time,
+ wakeup_deadline,
+ )
+ else:
+ self.logger.debug("Waiting for any due schedules to appear")
+
+ with move_on_after(wait_time):
+ await wakeup_event.wait()
+ wakeup_event = anyio.Event()
+ else:
+ self.logger.debug("Processing more schedules on the next iteration")
+
+ async def _process_jobs(self, *, task_status: TaskStatus) -> None:
+ wakeup_event = anyio.Event()
+
+ async def job_added(event: Event) -> None:
+ if len(self._running_jobs) < self.max_concurrent_jobs:
+ wakeup_event.set()
+
+ async with AsyncExitStack() as exit_stack:
+ # Start the job executors
+ for job_executor in self.job_executors.values():
+ await job_executor.start(exit_stack)
+
+ task_group = await exit_stack.enter_async_context(create_task_group())
+
+ # Fetch new jobs every time
+ exit_stack.enter_context(self.event_broker.subscribe(job_added, {JobAdded}))
+
+ # Signal that we are ready, and wait for the scheduler start event
+ task_status.started()
+ await self.get_next_event(SchedulerStarted)
+
+ while self._state is RunState.started:
+ limit = self.max_concurrent_jobs - len(self._running_jobs)
+ if limit > 0:
+ jobs = await self.data_store.acquire_jobs(self.identity, limit)
+ for job in jobs:
+ task = await self.data_store.get_task(job.task_id)
+ self._running_jobs.add(job.id)
+ task_group.start_soon(
+ self._run_job, job, task.func, task.executor
+ )
+
+ await wakeup_event.wait()
+ wakeup_event = anyio.Event()
+
+ async def _run_job(self, job: Job, func: Callable, executor: str) -> None:
+ try:
+ # Check if the job started before the deadline
+ start_time = datetime.now(timezone.utc)
+ if job.start_deadline is not None and start_time > job.start_deadline:
+ result = JobResult.from_job(
+ job,
+ outcome=JobOutcome.missed_start_deadline,
+ finished_at=start_time,
+ )
+ await self.data_store.release_job(self.identity, job.task_id, result)
+ await self.event_broker.publish(
+ JobReleased.from_result(result, self.identity)
+ )
+ return
+
+ try:
+ job_executor = self.job_executors[executor]
+ except KeyError:
+ return
+
+ token = current_job.set(JobInfo.from_job(job))
+ try:
+ retval = await job_executor.run_job(func, job)
+ except get_cancelled_exc_class():
+ self.logger.info("Job %s was cancelled", job.id)
+ with CancelScope(shield=True):
+ result = JobResult.from_job(
+ job,
+ outcome=JobOutcome.cancelled,
+ )
+ await self.data_store.release_job(
+ self.identity, job.task_id, result
+ )
+ await self.event_broker.publish(
+ JobReleased.from_result(result, self.identity)
+ )
+ except BaseException as exc:
+ if isinstance(exc, Exception):
+ self.logger.exception("Job %s raised an exception", job.id)
+ else:
+ self.logger.error(
+ "Job %s was aborted due to %s", job.id, exc.__class__.__name__
+ )
+
+ result = JobResult.from_job(
+ job,
+ JobOutcome.error,
+ exception=exc,
+ )
+ await self.data_store.release_job(
+ self.identity,
+ job.task_id,
+ result,
+ )
+ await self.event_broker.publish(
+ JobReleased.from_result(result, self.identity)
+ )
+ if not isinstance(exc, Exception):
+ raise
+ else:
+ self.logger.info("Job %s completed successfully", job.id)
+ result = JobResult.from_job(
+ job,
+ JobOutcome.success,
+ return_value=retval,
+ )
+ await self.data_store.release_job(self.identity, job.task_id, result)
+ await self.event_broker.publish(
+ JobReleased.from_result(result, self.identity)
+ )
+ finally:
+ current_job.reset(token)
+ finally:
+ self._running_jobs.remove(job.id)
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 3a812a4..c80099f 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -17,7 +17,7 @@ from anyio import start_blocking_portal
from anyio.from_thread import BlockingPortal
from .. import Event, current_scheduler
-from .._enums import CoalescePolicy, ConflictPolicy, RunState
+from .._enums import CoalescePolicy, ConflictPolicy, RunState, SchedulerRole
from .._structures import JobResult, Schedule
from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger
from .async_ import AsyncScheduler
@@ -37,8 +37,7 @@ class Scheduler:
event_broker: EventBroker | None = None,
*,
identity: str | None = None,
- process_schedules: bool = True,
- start_worker: bool = True,
+ role: SchedulerRole = SchedulerRole.both,
job_executors: Mapping[str, JobExecutor] | None = None,
default_job_executor: str | None = None,
logger: Logger | None = None,
@@ -54,8 +53,7 @@ class Scheduler:
self._async_scheduler = AsyncScheduler(
identity=identity,
- process_schedules=process_schedules,
- process_jobs=start_worker,
+ role=role,
job_executors=job_executors,
default_job_executor=default_job_executor,
logger=logger or logging.getLogger(__name__),
@@ -78,12 +76,8 @@ class Scheduler:
return self._async_scheduler.identity
@property
- def process_schedules(self) -> bool:
- return self._async_scheduler.process_schedules
-
- @property
- def start_worker(self) -> bool:
- return self._async_scheduler.process_jobs
+ def role(self) -> SchedulerRole:
+ return self._async_scheduler.role
@property
def job_executors(self) -> MutableMapping[str, JobExecutor]:
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 30c8dc9..58a1eae 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -30,6 +30,7 @@ from apscheduler import (
current_job,
current_scheduler,
)
+from apscheduler._enums import SchedulerRole
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.date import DateTrigger
@@ -62,6 +63,7 @@ def dummy_sync_job(delay: float = 0, fail: bool = False) -> str:
class TestAsyncScheduler:
async def test_schedule_job(self) -> None:
def listener(received_event: Event) -> None:
+ print(received_event)
received_events.append(received_event)
if isinstance(received_event, ScheduleRemoved):
event.set()
@@ -69,7 +71,7 @@ class TestAsyncScheduler:
received_events: list[Event] = []
event = anyio.Event()
trigger = DateTrigger(datetime.now(timezone.utc))
- async with AsyncScheduler(process_jobs=False) as scheduler:
+ async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler:
scheduler.event_broker.subscribe(listener)
await scheduler.add_schedule(dummy_async_job, trigger, id="foo")
await scheduler.start_in_background()
@@ -110,7 +112,7 @@ class TestAsyncScheduler:
assert not received_events
async def test_add_get_schedule(self) -> None:
- async with AsyncScheduler(process_jobs=False) as scheduler:
+ async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler:
with pytest.raises(ScheduleLookupError):
await scheduler.get_schedule("dummyid")
@@ -120,7 +122,7 @@ class TestAsyncScheduler:
assert isinstance(schedule, Schedule)
async def test_add_get_schedules(self) -> None:
- async with AsyncScheduler(process_jobs=False) as scheduler:
+ async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler:
assert await scheduler.get_schedules() == []
schedule1_id = await scheduler.add_schedule(
@@ -160,7 +162,7 @@ class TestAsyncScheduler:
orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
fake_uniform = mocker.patch("random.uniform")
fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
- async with AsyncScheduler(process_jobs=False) as scheduler:
+ async with AsyncScheduler(role=SchedulerRole.scheduler) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = anyio.Event()
scheduler.event_broker.subscribe(job_added_listener, {JobAdded})
@@ -315,7 +317,7 @@ class TestSyncScheduler:
received_events: list[Event] = []
event = threading.Event()
trigger = DateTrigger(datetime.now(timezone.utc))
- with Scheduler(start_worker=False) as scheduler:
+ with Scheduler(role=SchedulerRole.scheduler) as scheduler:
scheduler.event_broker.subscribe(listener)
scheduler.add_schedule(dummy_sync_job, trigger, id="foo")
scheduler.start_in_background()
@@ -351,17 +353,17 @@ class TestSyncScheduler:
assert isinstance(received_event, SchedulerStopped)
def test_add_get_schedule(self) -> None:
- with Scheduler(start_worker=False) as scheduler:
+ with Scheduler(role=SchedulerRole.scheduler) as scheduler:
with pytest.raises(ScheduleLookupError):
scheduler.get_schedule("dummyid")
trigger = DateTrigger(datetime.now(timezone.utc))
- scheduler.add_schedule(dummy_async_job, trigger, id="dummyid")
+ scheduler.add_schedule(dummy_sync_job, trigger, id="dummyid")
schedule = scheduler.get_schedule("dummyid")
assert isinstance(schedule, Schedule)
def test_add_get_schedules(self) -> None:
- with Scheduler(start_worker=False) as scheduler:
+ with Scheduler(role=SchedulerRole.scheduler) as scheduler:
assert scheduler.get_schedules() == []
schedule1_id = scheduler.add_schedule(
@@ -399,12 +401,12 @@ class TestSyncScheduler:
orig_start_time = datetime.now(timezone) - timedelta(seconds=1)
fake_uniform = mocker.patch("random.uniform")
fake_uniform.configure_mock(side_effect=lambda a, b: jitter)
- with Scheduler(start_worker=False) as scheduler:
+ with Scheduler(role=SchedulerRole.scheduler) as scheduler:
trigger = IntervalTrigger(seconds=3, start_time=orig_start_time)
job_added_event = threading.Event()
scheduler.event_broker.subscribe(job_added_listener, {JobAdded})
schedule_id = scheduler.add_schedule(
- dummy_async_job, trigger, max_jitter=max_jitter
+ dummy_sync_job, trigger, max_jitter=max_jitter
)
schedule = scheduler.get_schedule(schedule_id)
assert schedule.max_jitter == timedelta(seconds=max_jitter)