diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-13 01:07:08 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-13 01:07:44 +0300 |
commit | 8b68b6c5d1c63faae1ba3769b6475b396328e3a3 (patch) | |
tree | 2a172cf3bed738b4ba969cf08c86f7a8e91150f8 | |
parent | 200c5713193c011e5b230c3c20afe31f261c8291 (diff) | |
download | apscheduler-8b68b6c5d1c63faae1ba3769b6475b396328e3a3.tar.gz |
Added scheduler methods for creating jobs directly w/o schedules
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 6 | ||||
-rw-r--r-- | src/apscheduler/datastores/memory.py | 12 | ||||
-rw-r--r-- | src/apscheduler/datastores/mongodb.py | 13 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 14 | ||||
-rw-r--r-- | src/apscheduler/events.py | 34 | ||||
-rw-r--r-- | src/apscheduler/exceptions.py | 22 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 110 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 96 | ||||
-rw-r--r-- | src/apscheduler/structures.py | 9 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 25 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 9 | ||||
-rw-r--r-- | tests/test_schedulers.py | 85 | ||||
-rw-r--r-- | tests/test_workers.py | 18 |
13 files changed, 351 insertions, 102 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index b15b154..92ba02b 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -19,7 +19,7 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule from ..enums import ConflictPolicy from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded, + DataStoreEvent, JobAcquired, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError @@ -368,6 +368,10 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): where(self.t_tasks.c.id == p_id) await conn.execute(update, params) + # Publish the appropriate events + for job in acquired_jobs: + await self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return acquired_jobs async def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 98306a2..96514ca 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -13,7 +13,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule from ..enums import ConflictPolicy from ..eventbrokers.local import LocalEventBroker from ..events import ( - JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) + JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, + TaskRemoved, TaskUpdated) from ..exceptions import ConflictingIdError, TaskLookupError from ..structures import JobResult, Task from ..util import reentrant @@ -261,6 +262,10 @@ class MemoryDataStore(DataStore): if len(jobs) == limit: break + # Publish the appropriate events + for job in jobs: + self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: @@ -278,5 +283,10 @@ class MemoryDataStore(DataStore): # Record the result self._job_results[result.job_id] = result + # Publish the event + self._events.publish( + JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome) + ) + def get_job_result(self, job_id: UUID) -> Optional[JobResult]: return self._job_results.pop(job_id, None) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index e5b8283..5066c56 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer from ..enums import ConflictPolicy from ..eventbrokers.local import LocalEventBroker from ..events import ( - DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, - TaskRemoved, TaskUpdated) + DataStoreEvent, JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved, + ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) from ..exceptions import ( ConflictingIdError, DeserializationError, SerializationError, TaskLookupError) from ..serializers.pickle import PickleSerializer @@ -336,6 +336,10 @@ class MongoDBDataStore(DataStore): session=session ) + # Publish the appropriate events + for job in acquired_jobs: + self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return acquired_jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: @@ -355,6 +359,11 @@ class MongoDBDataStore(DataStore): # Delete the job self._jobs.delete_one({'_id': result.job_id}, session=session) + # Publish the event + self._events.publish( + JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome) + ) + def get_job_result(self, job_id: UUID) -> Optional[JobResult]: document = self._jobs_results.find_one_and_delete({'_id': job_id}) if document: diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 8dea821..31e60cc 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -20,8 +20,9 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome from ..eventbrokers.local import LocalEventBroker from ..events import ( - Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, - ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated) + Event, JobAcquired, JobAdded, JobDeserializationFailed, JobReleased, ScheduleAdded, + ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, + TaskUpdated) from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError from ..marshalling import callable_to_ref from ..serializers.pickle import PickleSerializer @@ -485,6 +486,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): where(self.t_tasks.c.id == p_id) conn.execute(update, params) + # Publish the appropriate events + for job in acquired_jobs: + self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id)) + return acquired_jobs def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None: @@ -504,6 +509,11 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id) conn.execute(delete) + # Publish the event + self._events.publish( + JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome) + ) + def get_job_result(self, job_id: UUID) -> Optional[JobResult]: with self.engine.begin() as conn: # Retrieve the result diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py index 9d04adc..86e98c7 100644 --- a/src/apscheduler/events.py +++ b/src/apscheduler/events.py @@ -10,7 +10,6 @@ from attr.converters import optional from .converters import as_aware_datetime, as_uuid from .enums import JobOutcome -from .structures import Job @attr.define(kw_only=True, frozen=True) @@ -124,36 +123,17 @@ class WorkerStopped(WorkerEvent): @attr.define(kw_only=True, frozen=True) -class JobExecutionEvent(WorkerEvent): - job_id: UUID = attr.field(converter=as_uuid) - task_id: str - schedule_id: Optional[str] - scheduled_fire_time: Optional[datetime] = attr.field(converter=optional(as_aware_datetime)) - start_deadline: Optional[datetime] = attr.field(converter=optional(as_aware_datetime)) - +class JobAcquired(WorkerEvent): + """Signals that a worker has acquired a job for processing.""" -@attr.define(kw_only=True, frozen=True) -class JobStarted(JobExecutionEvent): - """Signals that a worker has started processing a job.""" - - @classmethod - def from_job(cls, job: Job, start_time: datetime) -> JobStarted: - return JobStarted( - timestamp=start_time, job_id=job.id, task_id=job.task_id, - schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time, - start_deadline=job.start_deadline) + job_id: UUID = attr.field(converter=as_uuid) + worker_id: str @attr.define(kw_only=True, frozen=True) -class JobEnded(JobExecutionEvent): +class JobReleased(WorkerEvent): """Signals that a worker has finished processing of a job.""" + job_id: UUID = attr.field(converter=as_uuid) + worker_id: str outcome: JobOutcome - start_time: datetime = attr.field(converter=as_aware_datetime) - - @classmethod - def from_job(cls, job: Job, outcome: JobOutcome, start_time: datetime) -> JobEnded: - return JobEnded( - timestamp=datetime.now(timezone.utc), job_id=job.id, task_id=job.task_id, - schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time, - start_deadline=job.start_deadline, outcome=outcome, start_time=start_time) diff --git a/src/apscheduler/exceptions.py b/src/apscheduler/exceptions.py index 1f90ba0..b84e58a 100644 --- a/src/apscheduler/exceptions.py +++ b/src/apscheduler/exceptions.py @@ -1,3 +1,6 @@ +from uuid import UUID + + class TaskLookupError(LookupError): """Raised by a data store when it cannot find the requested task.""" @@ -8,8 +11,23 @@ class TaskLookupError(LookupError): class JobLookupError(KeyError): """Raised when the job store cannot find a job for update or removal.""" - def __init__(self, job_id): - super().__init__(u'No job by the id of %s was found' % job_id) + def __init__(self, job_id: UUID): + super().__init__(f'No job by the id of {job_id} was found') + + +class JobResultNotReady(KeyError): + """Raised by ``get_job_result()`` if the job result is not ready.""" + + def __init__(self, job_id: UUID): + super().__init__(f'No job by the id of {job_id} was found') + + +class JobCancelled(Exception): + """Raised by ``run_job()`` if the job was cancelled.""" + + +class JobDeadlineMissed(Exception): + """Raised by ``run_job()`` if the job failed to start within the allotted time.""" class ConflictingIdError(KeyError): diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 213b4de..6900460 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -6,7 +6,7 @@ from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger from typing import Any, Callable, Iterable, Mapping, Optional -from uuid import uuid4 +from uuid import UUID, uuid4 import anyio from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after @@ -15,11 +15,13 @@ from anyio.abc import TaskGroup from ..abc import AsyncDataStore, DataStore, EventSource, Job, Schedule, Trigger from ..datastores.async_adapter import AsyncDataStoreAdapter from ..datastores.memory import MemoryDataStore -from ..enums import CoalescePolicy, ConflictPolicy, RunState +from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker -from ..events import ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated +from ..events import ( + JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated) +from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError from ..marshalling import callable_to_ref -from ..structures import Task +from ..structures import JobResult, Task from ..workers.async_ import AsyncWorker @@ -121,6 +123,92 @@ class AsyncScheduler: async def remove_schedule(self, schedule_id: str) -> None: await self.data_store.remove_schedules({schedule_id}) + async def add_job( + self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None, + kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = None + ) -> UUID: + """ + Add a job to the data store. + + :param func_or_task_id: + :param args: positional arguments to call the target callable with + :param kwargs: keyword arguments to call the target callable with + :param tags: + :return: the ID of the newly created job + + """ + if callable(func_or_task_id): + task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id) + await self.data_store.add_task(task) + else: + task = await self.data_store.get_task(func_or_task_id) + + job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags) + await self.data_store.add_job(job) + return job.id + + async def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult: + """ + Retrieve the result of a job. + + :param job_id: the ID of the job + :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to + raise an exception if the result is not yet available + :raises JobLookupError: if the job does not exist in the data store + + """ + wait_event = anyio.Event() + + def listener(event: JobReleased) -> None: + if event.job_id == job_id: + wait_event.set() + + with self.data_store.events.subscribe(listener, {JobReleased}): + result = await self.data_store.get_job_result(job_id) + if result: + return result + elif not wait: + raise JobLookupError(job_id) + + await wait_event.wait() + + result = await self.data_store.get_job_result(job_id) + assert isinstance(result, JobResult) + return result + + async def run_job( + self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None, + kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = () + ) -> Any: + """ + Convenience method to add a job and then return its result (or raise its exception). + + :returns: the return value of the target function + + """ + job_complete_event = anyio.Event() + + def listener(event: JobReleased) -> None: + if event.job_id == job_id: + job_complete_event.set() + + job_id: Optional[UUID] = None + with self.data_store.events.subscribe(listener, {JobReleased}): + job_id = await self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags) + await job_complete_event.wait() + + result = await self.get_job_result(job_id) + if result.outcome is JobOutcome.success: + return result.return_value + elif result.outcome is JobOutcome.error: + raise result.exception + elif result.outcome is JobOutcome.missed_start_deadline: + raise JobDeadlineMissed + elif result.outcome is JobOutcome.cancelled: + raise JobCancelled + else: + raise RuntimeError(f'Unknown job outcome: {result.outcome}') + async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None: if self._state is not RunState.starting: raise RuntimeError(f'This function cannot be called while the scheduler is in the ' @@ -192,17 +280,3 @@ class AsyncScheduler: self._state = RunState.stopped await self._events.publish(SchedulerStopped()) - - # async def stop(self, force: bool = False) -> None: - # self._running = False - # if self._worker: - # await self._worker.stop(force) - # - # if self._acquire_cancel_scope: - # self._acquire_cancel_scope.cancel() - # if force and self._task_group: - # self._task_group.cancel_scope.cancel() - # - # async def wait_until_stopped(self) -> None: - # if self._stop_event: - # await self._stop_event.wait() diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 221b284..1525bea 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -8,15 +8,17 @@ from contextlib import ExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger from typing import Any, Callable, Iterable, Mapping, Optional -from uuid import uuid4 +from uuid import UUID, uuid4 from ..abc import DataStore, EventSource, Trigger from ..datastores.memory import MemoryDataStore -from ..enums import CoalescePolicy, ConflictPolicy, RunState +from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState from ..eventbrokers.local import LocalEventBroker -from ..events import Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated +from ..events import ( + Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated) +from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError from ..marshalling import callable_to_ref -from ..structures import Job, Schedule, Task +from ..structures import Job, JobResult, Schedule, Task from ..workers.sync import Worker @@ -121,6 +123,92 @@ class Scheduler: def remove_schedule(self, schedule_id: str) -> None: self.data_store.remove_schedules({schedule_id}) + def add_job( + self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None, + kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = None + ) -> UUID: + """ + Add a job to the data store. + + :param func_or_task_id: + :param args: positional arguments to call the target callable with + :param kwargs: keyword arguments to call the target callable with + :param tags: + :return: the ID of the newly created job + + """ + if callable(func_or_task_id): + task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id) + self.data_store.add_task(task) + else: + task = self.data_store.get_task(func_or_task_id) + + job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags) + self.data_store.add_job(job) + return job.id + + def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult: + """ + Retrieve the result of a job. + + :param job_id: the ID of the job + :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to + raise an exception if the result is not yet available + :raises JobLookupError: if the job does not exist in the data store + + """ + wait_event = threading.Event() + + def listener(event: JobReleased) -> None: + if event.job_id == job_id: + wait_event.set() + + with self.data_store.events.subscribe(listener, {JobReleased}): + result = self.data_store.get_job_result(job_id) + if result: + return result + elif not wait: + raise JobLookupError(job_id) + + wait_event.wait() + + result = self.data_store.get_job_result(job_id) + assert isinstance(result, JobResult) + return result + + def run_job( + self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None, + kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = () + ) -> Any: + """ + Convenience method to add a job and then return its result (or raise its exception). + + :returns: the return value of the target function + + """ + job_complete_event = threading.Event() + + def listener(event: JobReleased) -> None: + if event.job_id == job_id: + job_complete_event.set() + + job_id: Optional[UUID] = None + with self.data_store.events.subscribe(listener, {JobReleased}): + job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags) + job_complete_event.wait() + + result = self.get_job_result(job_id) + if result.outcome is JobOutcome.success: + return result.return_value + elif result.outcome is JobOutcome.error: + raise result.exception + elif result.outcome is JobOutcome.missed_start_deadline: + raise JobDeadlineMissed + elif result.outcome is JobOutcome.cancelled: + raise JobCancelled + else: + raise RuntimeError(f'Unknown job outcome: {result.outcome}') + def run(self) -> None: if self._state is not RunState.starting: raise RuntimeError(f'This function cannot be called while the scheduler is in the ' diff --git a/src/apscheduler/structures.py b/src/apscheduler/structures.py index 4d1bc18..c805ef0 100644 --- a/src/apscheduler/structures.py +++ b/src/apscheduler/structures.py @@ -6,6 +6,7 @@ from typing import Any, Callable, Optional from uuid import UUID, uuid4 import attr +from attr.converters import default_if_none from . import abc from .enums import CoalescePolicy, JobOutcome @@ -89,12 +90,14 @@ class Schedule: class Job: id: UUID = attr.field(factory=uuid4) task_id: str = attr.field(eq=False, order=False) - args: tuple = attr.field(eq=False, order=False, default=()) - kwargs: dict[str, Any] = attr.field(eq=False, order=False, factory=dict) + args: tuple = attr.field(eq=False, order=False, converter=default_if_none(())) + kwargs: dict[str, Any] = attr.field( + eq=False, order=False, converter=default_if_none(factory=dict)) schedule_id: Optional[str] = attr.field(eq=False, order=False, default=None) scheduled_fire_time: Optional[datetime] = attr.field(eq=False, order=False, default=None) start_deadline: Optional[datetime] = attr.field(eq=False, order=False, default=None) - tags: frozenset[str] = attr.field(eq=False, order=False, factory=frozenset) + tags: frozenset[str] = attr.field( + eq=False, order=False, converter=default_if_none(factory=frozenset)) created_at: datetime = attr.field(eq=False, order=False, factory=partial(datetime.now, timezone.utc)) started_at: Optional[datetime] = attr.field(eq=False, order=False, default=None) diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index 5e872cf..dfa81dc 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -17,7 +17,7 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job from ..datastores.async_adapter import AsyncDataStoreAdapter from ..enums import JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker -from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped +from ..events import JobAdded, WorkerStarted, WorkerStopped from ..structures import JobResult @@ -122,11 +122,10 @@ class AsyncWorker: # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - await self._events.publish( - JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time)) + result = JobResult(job_id=job.id, outcome=JobOutcome.missed_start_deadline) + await self.data_store.release_job(self.identity, job.task_id, result) return - await self._events.publish(JobStarted.from_job(job, start_time)) try: retval = func(*job.args, **job.kwargs) if isawaitable(retval): @@ -135,31 +134,13 @@ class AsyncWorker: with CancelScope(shield=True): result = JobResult(job_id=job.id, outcome=JobOutcome.cancelled) await self.data_store.release_job(self.identity, job.task_id, result) - - with move_on_after(1, shield=True): - await self._events.publish( - JobEnded.from_job(job, JobOutcome.cancelled, start_time)) except BaseException as exc: result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc) await self.data_store.release_job(self.identity, job.task_id, result) - await self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time)) if not isinstance(exc, Exception): raise else: result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval) await self.data_store.release_job(self.identity, job.task_id, result) - await self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time)) finally: self._running_jobs.remove(job.id) - - # async def stop(self, force: bool = False) -> None: - # self._running = False - # if self._acquire_cancel_scope: - # self._acquire_cancel_scope.cancel() - # - # if force and self._task_group: - # self._task_group.cancel_scope.cancel() - # - # async def wait_until_stopped(self) -> None: - # if self._stop_event: - # await self._stop_event.wait() diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index be805ee..48922da 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -13,7 +13,7 @@ from uuid import UUID from ..abc import DataStore, EventSource from ..enums import JobOutcome, RunState from ..eventbrokers.local import LocalEventBroker -from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped +from ..events import JobAdded, WorkerStarted, WorkerStopped from ..structures import Job, JobResult @@ -118,22 +118,19 @@ class Worker: # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - self._events.publish( - JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time)) + result = JobResult(job_id=job.id, outcome=JobOutcome.missed_start_deadline) + self.data_store.release_job(self.identity, job.task_id, result) return - self._events.publish(JobStarted.from_job(job, start_time)) try: retval = func(*job.args, **job.kwargs) except BaseException as exc: result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc) self.data_store.release_job(self.identity, job.task_id, result) - self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time)) if not isinstance(exc, Exception): raise else: result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval) self.data_store.release_job(self.identity, job.task_id, result) - self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time)) finally: self._running_jobs.remove(job.id) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 6a45b8b..5f1a7df 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -1,12 +1,15 @@ import threading +import time from datetime import datetime, timezone import anyio import pytest from anyio import fail_after +from apscheduler.enums import JobOutcome from apscheduler.events import ( Event, JobAdded, ScheduleAdded, ScheduleRemoved, SchedulerStarted, SchedulerStopped, TaskAdded) +from apscheduler.exceptions import JobLookupError from apscheduler.schedulers.async_ import AsyncScheduler from apscheduler.schedulers.sync import Scheduler from apscheduler.triggers.date import DateTrigger @@ -14,12 +17,20 @@ from apscheduler.triggers.date import DateTrigger pytestmark = pytest.mark.anyio -async def dummy_async_job(): - return 'returnvalue' +async def dummy_async_job(delay: float = 0, fail: bool = False) -> str: + await anyio.sleep(delay) + if fail: + raise RuntimeError('failing as requested') + else: + return 'returnvalue' -def dummy_sync_job(): - return 'returnvalue' +def dummy_sync_job(delay: float = 0, fail: bool = False) -> str: + time.sleep(delay) + if fail: + raise RuntimeError('failing as requested') + else: + return 'returnvalue' class TestAsyncScheduler: @@ -72,6 +83,39 @@ class TestAsyncScheduler: # There should be no more events on the list assert not received_events + async def test_get_job_result_success(self) -> None: + async with AsyncScheduler() as scheduler: + job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2}) + result = await scheduler.get_job_result(job_id) + assert result.job_id == job_id + assert result.outcome is JobOutcome.success + assert result.return_value == 'returnvalue' + + async def test_get_job_result_error(self) -> None: + async with AsyncScheduler() as scheduler: + job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2, 'fail': True}) + result = await scheduler.get_job_result(job_id) + assert result.job_id == job_id + assert result.outcome is JobOutcome.error + assert isinstance(result.exception, RuntimeError) + assert str(result.exception) == 'failing as requested' + + async def test_get_job_result_nowait_not_yet_ready(self) -> None: + async with AsyncScheduler() as scheduler: + job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2}) + with pytest.raises(JobLookupError): + await scheduler.get_job_result(job_id, wait=False) + + async def test_run_job_success(self) -> None: + async with AsyncScheduler() as scheduler: + return_value = await scheduler.run_job(dummy_async_job) + assert return_value == 'returnvalue' + + async def test_run_job_failure(self) -> None: + async with AsyncScheduler() as scheduler: + with pytest.raises(RuntimeError, match='failing as requested'): + await scheduler.run_job(dummy_async_job, kwargs={'fail': True}) + class TestSyncScheduler: def test_schedule_job(self): @@ -102,7 +146,6 @@ class TestSyncScheduler: received_event = received_events.pop(0) assert isinstance(received_event, ScheduleAdded) assert received_event.schedule_id == 'foo' - # assert received_event.task_id == 'task_id' # Then that schedule was processed and a job was added for it received_event = received_events.pop(0) @@ -121,3 +164,35 @@ class TestSyncScheduler: # There should be no more events on the list assert not received_events + + def test_get_job_result(self) -> None: + with Scheduler() as scheduler: + job_id = scheduler.add_job(dummy_sync_job) + result = scheduler.get_job_result(job_id) + assert result.outcome is JobOutcome.success + assert result.return_value == 'returnvalue' + + def test_get_job_result_error(self) -> None: + with Scheduler() as scheduler: + job_id = scheduler.add_job(dummy_sync_job, kwargs={'delay': 0.2, 'fail': True}) + result = scheduler.get_job_result(job_id) + assert result.job_id == job_id + assert result.outcome is JobOutcome.error + assert isinstance(result.exception, RuntimeError) + assert str(result.exception) == 'failing as requested' + + def test_get_job_result_nowait_not_yet_ready(self) -> None: + with Scheduler() as scheduler: + job_id = scheduler.add_job(dummy_sync_job, kwargs={'delay': 0.2}) + with pytest.raises(JobLookupError): + scheduler.get_job_result(job_id, wait=False) + + def test_run_job_success(self) -> None: + with Scheduler() as scheduler: + return_value = scheduler.run_job(dummy_sync_job) + assert return_value == 'returnvalue' + + def test_run_job_failure(self) -> None: + with Scheduler() as scheduler: + with pytest.raises(RuntimeError, match='failing as requested'): + scheduler.run_job(dummy_sync_job, kwargs={'fail': True}) diff --git a/tests/test_workers.py b/tests/test_workers.py index 74a018c..872cf34 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -10,7 +10,7 @@ from apscheduler.abc import Job from apscheduler.datastores.memory import MemoryDataStore from apscheduler.enums import JobOutcome from apscheduler.events import ( - Event, JobAdded, JobEnded, JobStarted, TaskAdded, WorkerStarted, WorkerStopped) + Event, JobAcquired, JobAdded, JobReleased, TaskAdded, WorkerStarted, WorkerStopped) from apscheduler.structures import Task from apscheduler.workers.async_ import AsyncWorker from apscheduler.workers.sync import Worker @@ -75,7 +75,7 @@ class TestAsyncWorker: # Then the job was started received_event = received_events.pop(0) - assert isinstance(received_event, JobStarted) + assert isinstance(received_event, JobAcquired) assert received_event.job_id == job.id assert received_event.task_id == 'task_id' assert received_event.schedule_id is None @@ -83,11 +83,11 @@ class TestAsyncWorker: received_event = received_events.pop(0) if fail: # Then the job failed - assert isinstance(received_event, JobEnded) + assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.error else: # Then the job finished successfully - assert isinstance(received_event, JobEnded) + assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.success # Finally, the worker was stopped @@ -136,7 +136,7 @@ class TestAsyncWorker: # Then the deadline was missed received_event = received_events.pop(0) - assert isinstance(received_event, JobEnded) + assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.missed_start_deadline assert received_event.job_id == job.id assert received_event.task_id == 'task_id' @@ -187,7 +187,7 @@ class TestSyncWorker: # Then the job was started received_event = received_events.pop(0) - assert isinstance(received_event, JobStarted) + assert isinstance(received_event, JobAcquired) assert received_event.job_id == job.id assert received_event.task_id == 'task_id' assert received_event.schedule_id is None @@ -195,11 +195,11 @@ class TestSyncWorker: received_event = received_events.pop(0) if fail: # Then the job failed - assert isinstance(received_event, JobEnded) + assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.error else: # Then the job finished successfully - assert isinstance(received_event, JobEnded) + assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.success # Finally, the worker was stopped @@ -247,7 +247,7 @@ class TestSyncWorker: # Then the deadline was missed received_event = received_events.pop(0) - assert isinstance(received_event, JobEnded) + assert isinstance(received_event, JobReleased) assert received_event.outcome is JobOutcome.missed_start_deadline assert received_event.job_id == job.id assert received_event.task_id == 'task_id' |