summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers/async_.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/schedulers/async_.py')
-rw-r--r--src/apscheduler/schedulers/async_.py338
1 files changed, 170 insertions, 168 deletions
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 72c5994..889596d 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -17,10 +17,11 @@ from anyio import (
get_cancelled_exc_class,
move_on_after,
)
+from anyio.abc import TaskGroup
-from ..abc import AsyncDataStore, EventSource, Job, Schedule, Trigger
+from ..abc import AsyncDataStore, AsyncEventBroker, Job, Schedule, Subscription, Trigger
from ..context import current_scheduler
-from ..converters import as_async_datastore
+from ..converters import as_async_datastore, as_async_eventbroker
from ..datastores.memory import MemoryDataStore
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
@@ -48,6 +49,9 @@ class AsyncScheduler:
data_store: AsyncDataStore = attrs.field(
converter=as_async_datastore, factory=MemoryDataStore
)
+ event_broker: AsyncEventBroker = attrs.field(
+ converter=as_async_eventbroker, factory=LocalAsyncEventBroker
+ )
identity: str = attrs.field(kw_only=True, default=None)
start_worker: bool = attrs.field(kw_only=True, default=True)
logger: Logger | None = attrs.field(kw_only=True, default=getLogger(__name__))
@@ -55,65 +59,24 @@ class AsyncScheduler:
_state: RunState = attrs.field(init=False, default=RunState.stopped)
_wakeup_event: anyio.Event = attrs.field(init=False)
_wakeup_deadline: datetime | None = attrs.field(init=False, default=None)
- _worker: AsyncWorker | None = attrs.field(init=False, default=None)
- _events: LocalAsyncEventBroker = attrs.field(
- init=False, factory=LocalAsyncEventBroker
- )
- _exit_stack: AsyncExitStack = attrs.field(init=False)
+ _task_group: TaskGroup | None = attrs.field(init=False, default=None)
+ _schedule_added_subscription: Subscription = attrs.field(init=False)
def __attrs_post_init__(self) -> None:
if not self.identity:
self.identity = f"{platform.node()}-{os.getpid()}-{id(self)}"
- @property
- def events(self) -> EventSource:
- return self._events
-
- @property
- def worker(self) -> AsyncWorker | None:
- return self._worker
-
async def __aenter__(self):
- self._state = RunState.starting
- self._wakeup_event = anyio.Event()
- self._exit_stack = AsyncExitStack()
- await self._exit_stack.__aenter__()
- await self._exit_stack.enter_async_context(self._events)
-
- # Initialize the data store and start relaying events to the scheduler's event broker
- await self._exit_stack.enter_async_context(self.data_store)
- self._exit_stack.enter_context(
- self.data_store.events.subscribe(self._events.publish)
- )
-
- # Wake up the scheduler if the data store emits a significant schedule event
- self._exit_stack.enter_context(
- self.data_store.events.subscribe(
- self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
- )
- )
-
- # Start the built-in worker, if configured to do so
- if self.start_worker:
- token = current_scheduler.set(self)
- try:
- self._worker = AsyncWorker(self.data_store)
- await self._exit_stack.enter_async_context(self._worker)
- finally:
- current_scheduler.reset(token)
-
- # Start the worker and return when it has signalled readiness or raised an exception
- task_group = create_task_group()
- await self._exit_stack.enter_async_context(task_group)
- await task_group.start(self.run)
+ self._task_group = create_task_group()
+ await self._task_group.__aenter__()
+ await self._task_group.start(self.run_until_stopped)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._state = RunState.stopping
self._wakeup_event.set()
- await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
- self._state = RunState.stopped
- del self._wakeup_event
+ await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
+ self._task_group = None
def _schedule_added_or_modified(self, event: Event) -> None:
event_ = cast("ScheduleAdded | ScheduleUpdated", event)
@@ -281,130 +244,169 @@ class AsyncScheduler:
else:
raise RuntimeError(f"Unknown job outcome: {result.outcome}")
- async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
- if self._state is not RunState.starting:
+ async def run_until_stopped(self, *, task_status=TASK_STATUS_IGNORED) -> None:
+ if self._state is not RunState.stopped:
raise RuntimeError(
- f"This function cannot be called while the scheduler is in the "
- f"{self._state} state"
+ f'Cannot start the scheduler when it is in the "{self._state}" '
+ f"state"
)
- # Signal that the scheduler has started
- self._state = RunState.started
- task_status.started()
- await self._events.publish(SchedulerStarted())
-
- exception: BaseException | None = None
- try:
- while self._state is RunState.started:
- schedules = await self.data_store.acquire_schedules(self.identity, 100)
- now = datetime.now(timezone.utc)
- for schedule in schedules:
- # Calculate a next fire time for the schedule, if possible
- fire_times = [schedule.next_fire_time]
- calculate_next = schedule.trigger.next
- while True:
- try:
- fire_time = calculate_next()
- except Exception:
- self.logger.exception(
- "Error computing next fire time for schedule %r of task %r – "
- "removing schedule",
- schedule.id,
- schedule.task_id,
- )
- break
-
- # Stop if the calculated fire time is in the future
- if fire_time is None or fire_time > now:
- schedule.next_fire_time = fire_time
- break
-
- # Only keep all the fire times if coalesce policy = "all"
- if schedule.coalesce is CoalescePolicy.all:
- fire_times.append(fire_time)
- elif schedule.coalesce is CoalescePolicy.latest:
- fire_times[0] = fire_time
-
- # Add one or more jobs to the job queue
- max_jitter = (
- schedule.max_jitter.total_seconds()
- if schedule.max_jitter
- else 0
+ self._state = RunState.starting
+ async with AsyncExitStack() as exit_stack:
+ self._wakeup_event = anyio.Event()
+
+ # Initialize the event broker
+ await self.event_broker.start()
+ exit_stack.push_async_exit(
+ lambda *exc_info: self.event_broker.stop(force=exc_info[0] is not None)
+ )
+
+ # Initialize the data store
+ await self.data_store.start(self.event_broker)
+ exit_stack.push_async_exit(
+ lambda *exc_info: self.data_store.stop(force=exc_info[0] is not None)
+ )
+
+ # Wake up the scheduler if the data store emits a significant schedule event
+ self._schedule_added_subscription = self.event_broker.subscribe(
+ self._schedule_added_or_modified, {ScheduleAdded, ScheduleUpdated}
+ )
+
+ # Start the built-in worker, if configured to do so
+ if self.start_worker:
+ token = current_scheduler.set(self)
+ exit_stack.callback(current_scheduler.reset, token)
+ worker = AsyncWorker(
+ self.data_store, self.event_broker, is_internal=True
+ )
+ await exit_stack.enter_async_context(worker)
+
+ # Signal that the scheduler has started
+ self._state = RunState.started
+ task_status.started()
+ await self.event_broker.publish_local(SchedulerStarted())
+
+ exception: BaseException | None = None
+ try:
+ while self._state is RunState.started:
+ schedules = await self.data_store.acquire_schedules(
+ self.identity, 100
)
- for i, fire_time in enumerate(fire_times):
- # Calculate a jitter if max_jitter > 0
- jitter = _zero_timedelta
- if max_jitter:
- if i + 1 < len(fire_times):
- next_fire_time = fire_times[i + 1]
- else:
- next_fire_time = schedule.next_fire_time
-
- if next_fire_time is not None:
- # Jitter must never be so high that it would cause a fire time to
- # equal or exceed the next fire time
- jitter_s = min(
- [
- max_jitter,
- (
- next_fire_time
- - fire_time
- - _microsecond_delta
- ).total_seconds(),
- ]
+ now = datetime.now(timezone.utc)
+ for schedule in schedules:
+ # Calculate a next fire time for the schedule, if possible
+ fire_times = [schedule.next_fire_time]
+ calculate_next = schedule.trigger.next
+ while True:
+ try:
+ fire_time = calculate_next()
+ except Exception:
+ self.logger.exception(
+ "Error computing next fire time for schedule %r of "
+ "task %r – removing schedule",
+ schedule.id,
+ schedule.task_id,
)
- jitter = timedelta(seconds=random.uniform(0, jitter_s))
- fire_time += jitter
-
- schedule.last_fire_time = fire_time
- job = Job(
- task_id=schedule.task_id,
- args=schedule.args,
- kwargs=schedule.kwargs,
- schedule_id=schedule.id,
- scheduled_fire_time=fire_time,
- jitter=jitter,
- start_deadline=schedule.next_deadline,
- tags=schedule.tags,
+ break
+
+ # Stop if the calculated fire time is in the future
+ if fire_time is None or fire_time > now:
+ schedule.next_fire_time = fire_time
+ break
+
+ # Only keep all the fire times if coalesce policy = "all"
+ if schedule.coalesce is CoalescePolicy.all:
+ fire_times.append(fire_time)
+ elif schedule.coalesce is CoalescePolicy.latest:
+ fire_times[0] = fire_time
+
+ # Add one or more jobs to the job queue
+ max_jitter = (
+ schedule.max_jitter.total_seconds()
+ if schedule.max_jitter
+ else 0
)
- await self.data_store.add_job(job)
-
- # Update the schedules (and release the scheduler's claim on them)
- await self.data_store.release_schedules(self.identity, schedules)
+ for i, fire_time in enumerate(fire_times):
+ # Calculate a jitter if max_jitter > 0
+ jitter = _zero_timedelta
+ if max_jitter:
+ if i + 1 < len(fire_times):
+ next_fire_time = fire_times[i + 1]
+ else:
+ next_fire_time = schedule.next_fire_time
+
+ if next_fire_time is not None:
+ # Jitter must never be so high that it would cause a
+ # fire time to equal or exceed the next fire time
+ jitter_s = min(
+ [
+ max_jitter,
+ (
+ next_fire_time
+ - fire_time
+ - _microsecond_delta
+ ).total_seconds(),
+ ]
+ )
+ jitter = timedelta(
+ seconds=random.uniform(0, jitter_s)
+ )
+ fire_time += jitter
+
+ schedule.last_fire_time = fire_time
+ job = Job(
+ task_id=schedule.task_id,
+ args=schedule.args,
+ kwargs=schedule.kwargs,
+ schedule_id=schedule.id,
+ scheduled_fire_time=fire_time,
+ jitter=jitter,
+ start_deadline=schedule.next_deadline,
+ tags=schedule.tags,
+ )
+ await self.data_store.add_job(job)
+
+ # Update the schedules (and release the scheduler's claim on them)
+ await self.data_store.release_schedules(self.identity, schedules)
+
+ # If we received fewer schedules than the maximum amount, sleep
+ # until the next schedule is due or the scheduler is explicitly
+ # woken up
+ wait_time = None
+ if len(schedules) < 100:
+ self._wakeup_deadline = (
+ await self.data_store.get_next_schedule_run_time()
+ )
+ if self._wakeup_deadline:
+ wait_time = (
+ self._wakeup_deadline - datetime.now(timezone.utc)
+ ).total_seconds()
+ self.logger.debug(
+ "Sleeping %.3f seconds until the next fire time (%s)",
+ wait_time,
+ self._wakeup_deadline,
+ )
+ else:
+ self.logger.debug("Waiting for any due schedules to appear")
- # If we received fewer schedules than the maximum amount, sleep until the next
- # schedule is due or the scheduler is explicitly woken up
- wait_time = None
- if len(schedules) < 100:
- self._wakeup_deadline = (
- await self.data_store.get_next_schedule_run_time()
- )
- if self._wakeup_deadline:
- wait_time = (
- self._wakeup_deadline - datetime.now(timezone.utc)
- ).total_seconds()
+ with move_on_after(wait_time):
+ await self._wakeup_event.wait()
+ self._wakeup_event = anyio.Event()
+ else:
self.logger.debug(
- "Sleeping %.3f seconds until the next fire time (%s)",
- wait_time,
- self._wakeup_deadline,
+ "Processing more schedules on the next iteration"
)
- else:
- self.logger.debug("Waiting for any due schedules to appear")
-
- with move_on_after(wait_time):
- await self._wakeup_event.wait()
- self._wakeup_event = anyio.Event()
- else:
- self.logger.debug("Processing more schedules on the next iteration")
- except get_cancelled_exc_class():
- pass
- except BaseException as exc:
- self.logger.exception("Scheduler crashed")
- exception = exc
- raise
- else:
- self.logger.info("Scheduler stopped")
- finally:
- self._state = RunState.stopped
- with move_on_after(3, shield=True):
- await self._events.publish(SchedulerStopped(exception=exception))
+ except get_cancelled_exc_class():
+ pass
+ except BaseException as exc:
+ self.logger.exception("Scheduler crashed")
+ exception = exc
+ raise
+ else:
+ self.logger.info("Scheduler stopped")
+ finally:
+ self._state = RunState.stopped
+ with move_on_after(3, shield=True):
+ await self.event_broker.publish_local(
+ SchedulerStopped(exception=exception)
+ )