summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/schedulers/sync.py')
-rw-r--r--src/apscheduler/schedulers/sync.py96
1 files changed, 92 insertions, 4 deletions
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 221b284..1525bea 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -8,15 +8,17 @@ from contextlib import ExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
from typing import Any, Callable, Iterable, Mapping, Optional
-from uuid import uuid4
+from uuid import UUID, uuid4
from ..abc import DataStore, EventSource, Trigger
from ..datastores.memory import MemoryDataStore
-from ..enums import CoalescePolicy, ConflictPolicy, RunState
+from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
-from ..events import Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated
+from ..events import (
+ Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
-from ..structures import Job, Schedule, Task
+from ..structures import Job, JobResult, Schedule, Task
from ..workers.sync import Worker
@@ -121,6 +123,92 @@ class Scheduler:
def remove_schedule(self, schedule_id: str) -> None:
self.data_store.remove_schedules({schedule_id})
+ def add_job(
+ self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None,
+ kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = None
+ ) -> UUID:
+ """
+ Add a job to the data store.
+
+ :param func_or_task_id:
+ :param args: positional arguments to call the target callable with
+ :param kwargs: keyword arguments to call the target callable with
+ :param tags:
+ :return: the ID of the newly created job
+
+ """
+ if callable(func_or_task_id):
+ task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id)
+ self.data_store.add_task(task)
+ else:
+ task = self.data_store.get_task(func_or_task_id)
+
+ job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags)
+ self.data_store.add_job(job)
+ return job.id
+
+ def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult:
+ """
+ Retrieve the result of a job.
+
+ :param job_id: the ID of the job
+ :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to
+ raise an exception if the result is not yet available
+ :raises JobLookupError: if the job does not exist in the data store
+
+ """
+ wait_event = threading.Event()
+
+ def listener(event: JobReleased) -> None:
+ if event.job_id == job_id:
+ wait_event.set()
+
+ with self.data_store.events.subscribe(listener, {JobReleased}):
+ result = self.data_store.get_job_result(job_id)
+ if result:
+ return result
+ elif not wait:
+ raise JobLookupError(job_id)
+
+ wait_event.wait()
+
+ result = self.data_store.get_job_result(job_id)
+ assert isinstance(result, JobResult)
+ return result
+
+ def run_job(
+ self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None,
+ kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = ()
+ ) -> Any:
+ """
+ Convenience method to add a job and then return its result (or raise its exception).
+
+ :returns: the return value of the target function
+
+ """
+ job_complete_event = threading.Event()
+
+ def listener(event: JobReleased) -> None:
+ if event.job_id == job_id:
+ job_complete_event.set()
+
+ job_id: Optional[UUID] = None
+ with self.data_store.events.subscribe(listener, {JobReleased}):
+ job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ job_complete_event.wait()
+
+ result = self.get_job_result(job_id)
+ if result.outcome is JobOutcome.success:
+ return result.return_value
+ elif result.outcome is JobOutcome.error:
+ raise result.exception
+ elif result.outcome is JobOutcome.missed_start_deadline:
+ raise JobDeadlineMissed
+ elif result.outcome is JobOutcome.cancelled:
+ raise JobCancelled
+ else:
+ raise RuntimeError(f'Unknown job outcome: {result.outcome}')
+
def run(self) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the scheduler is in the '