diff options
Diffstat (limited to 'src/apscheduler/schedulers/sync.py')
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 96 |
1 files changed, 92 insertions, 4 deletions
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 221b284..1525bea 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -8,15 +8,17 @@ from contextlib import ExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger from typing import Any, Callable, Iterable, Mapping, Optional -from uuid import uuid4 +from uuid import UUID, uuid4 from ..abc import DataStore, EventSource, Trigger from ..datastores.memory import MemoryDataStore -from ..enums import CoalescePolicy, ConflictPolicy, RunState +from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from ..eventbrokers.local import LocalEventBroker -from ..events import Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated +from ..events import ( + Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated) +from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError from ..marshalling import callable_to_ref -from ..structures import Job, Schedule, Task +from ..structures import Job, JobResult, Schedule, Task from ..workers.sync import Worker @@ -121,6 +123,92 @@ class Scheduler: def remove_schedule(self, schedule_id: str) -> None: self.data_store.remove_schedules({schedule_id}) + def add_job( + self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None, + kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = None + ) -> UUID: + """ + Add a job to the data store. + + :param func_or_task_id: + :param args: positional arguments to call the target callable with + :param kwargs: keyword arguments to call the target callable with + :param tags: + :return: the ID of the newly created job + + """ + if callable(func_or_task_id): + task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id) + self.data_store.add_task(task) + else: + task = self.data_store.get_task(func_or_task_id) + + job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags) + self.data_store.add_job(job) + return job.id + + def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult: + """ + Retrieve the result of a job. + + :param job_id: the ID of the job + :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to + raise an exception if the result is not yet available + :raises JobLookupError: if the job does not exist in the data store + + """ + wait_event = threading.Event() + + def listener(event: JobReleased) -> None: + if event.job_id == job_id: + wait_event.set() + + with self.data_store.events.subscribe(listener, {JobReleased}): + result = self.data_store.get_job_result(job_id) + if result: + return result + elif not wait: + raise JobLookupError(job_id) + + wait_event.wait() + + result = self.data_store.get_job_result(job_id) + assert isinstance(result, JobResult) + return result + + def run_job( + self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None, + kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = () + ) -> Any: + """ + Convenience method to add a job and then return its result (or raise its exception). + + :returns: the return value of the target function + + """ + job_complete_event = threading.Event() + + def listener(event: JobReleased) -> None: + if event.job_id == job_id: + job_complete_event.set() + + job_id: Optional[UUID] = None + with self.data_store.events.subscribe(listener, {JobReleased}): + job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags) + job_complete_event.wait() + + result = self.get_job_result(job_id) + if result.outcome is JobOutcome.success: + return result.return_value + elif result.outcome is JobOutcome.error: + raise result.exception + elif result.outcome is JobOutcome.missed_start_deadline: + raise JobDeadlineMissed + elif result.outcome is JobOutcome.cancelled: + raise JobCancelled + else: + raise RuntimeError(f'Unknown job outcome: {result.outcome}') + def run(self) -> None: if self._state is not RunState.starting: raise RuntimeError(f'This function cannot be called while the scheduler is in the ' |