summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-13 01:07:08 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-13 01:07:44 +0300
commit8b68b6c5d1c63faae1ba3769b6475b396328e3a3 (patch)
tree2a172cf3bed738b4ba969cf08c86f7a8e91150f8
parent200c5713193c011e5b230c3c20afe31f261c8291 (diff)
downloadapscheduler-8b68b6c5d1c63faae1ba3769b6475b396328e3a3.tar.gz
Added scheduler methods for creating jobs directly w/o schedules
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py6
-rw-r--r--src/apscheduler/datastores/memory.py12
-rw-r--r--src/apscheduler/datastores/mongodb.py13
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py14
-rw-r--r--src/apscheduler/events.py34
-rw-r--r--src/apscheduler/exceptions.py22
-rw-r--r--src/apscheduler/schedulers/async_.py110
-rw-r--r--src/apscheduler/schedulers/sync.py96
-rw-r--r--src/apscheduler/structures.py9
-rw-r--r--src/apscheduler/workers/async_.py25
-rw-r--r--src/apscheduler/workers/sync.py9
-rw-r--r--tests/test_schedulers.py85
-rw-r--r--tests/test_workers.py18
13 files changed, 351 insertions, 102 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index b15b154..92ba02b 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -19,7 +19,7 @@ from ..abc import AsyncDataStore, AsyncEventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- DataStoreEvent, JobAdded, JobDeserializationFailed, ScheduleAdded,
+ DataStoreEvent, JobAcquired, JobAdded, JobDeserializationFailed, ScheduleAdded,
ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
@@ -368,6 +368,10 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
where(self.t_tasks.c.id == p_id)
await conn.execute(update, params)
+ # Publish the appropriate events
+ for job in acquired_jobs:
+ await self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return acquired_jobs
async def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py
index 98306a2..96514ca 100644
--- a/src/apscheduler/datastores/memory.py
+++ b/src/apscheduler/datastores/memory.py
@@ -13,7 +13,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule
from ..enums import ConflictPolicy
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
+ JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded,
+ TaskRemoved, TaskUpdated)
from ..exceptions import ConflictingIdError, TaskLookupError
from ..structures import JobResult, Task
from ..util import reentrant
@@ -261,6 +262,10 @@ class MemoryDataStore(DataStore):
if len(jobs) == limit:
break
+ # Publish the appropriate events
+ for job in jobs:
+ self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
@@ -278,5 +283,10 @@ class MemoryDataStore(DataStore):
# Record the result
self._job_results[result.job_id] = result
+ # Publish the event
+ self._events.publish(
+ JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ )
+
def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
return self._job_results.pop(job_id, None)
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index e5b8283..5066c56 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -18,8 +18,8 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import ConflictPolicy
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- DataStoreEvent, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded,
- TaskRemoved, TaskUpdated)
+ DataStoreEvent, JobAcquired, JobAdded, JobReleased, ScheduleAdded, ScheduleRemoved,
+ ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
from ..exceptions import (
ConflictingIdError, DeserializationError, SerializationError, TaskLookupError)
from ..serializers.pickle import PickleSerializer
@@ -336,6 +336,10 @@ class MongoDBDataStore(DataStore):
session=session
)
+ # Publish the appropriate events
+ for job in acquired_jobs:
+ self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return acquired_jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
@@ -355,6 +359,11 @@ class MongoDBDataStore(DataStore):
# Delete the job
self._jobs.delete_one({'_id': result.job_id}, session=session)
+ # Publish the event
+ self._events.publish(
+ JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ )
+
def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
document = self._jobs_results.find_one_and_delete({'_id': job_id})
if document:
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 8dea821..31e60cc 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -20,8 +20,9 @@ from ..abc import DataStore, EventBroker, EventSource, Job, Schedule, Serializer
from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed,
- ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved, TaskUpdated)
+ Event, JobAcquired, JobAdded, JobDeserializationFailed, JobReleased, ScheduleAdded,
+ ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskRemoved,
+ TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
from ..serializers.pickle import PickleSerializer
@@ -485,6 +486,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
where(self.t_tasks.c.id == p_id)
conn.execute(update, params)
+ # Publish the appropriate events
+ for job in acquired_jobs:
+ self._events.publish(JobAcquired(job_id=job.id, worker_id=worker_id))
+
return acquired_jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
@@ -504,6 +509,11 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id)
conn.execute(delete)
+ # Publish the event
+ self._events.publish(
+ JobReleased(job_id=result.job_id, worker_id=worker_id, outcome=result.outcome)
+ )
+
def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
with self.engine.begin() as conn:
# Retrieve the result
diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py
index 9d04adc..86e98c7 100644
--- a/src/apscheduler/events.py
+++ b/src/apscheduler/events.py
@@ -10,7 +10,6 @@ from attr.converters import optional
from .converters import as_aware_datetime, as_uuid
from .enums import JobOutcome
-from .structures import Job
@attr.define(kw_only=True, frozen=True)
@@ -124,36 +123,17 @@ class WorkerStopped(WorkerEvent):
@attr.define(kw_only=True, frozen=True)
-class JobExecutionEvent(WorkerEvent):
- job_id: UUID = attr.field(converter=as_uuid)
- task_id: str
- schedule_id: Optional[str]
- scheduled_fire_time: Optional[datetime] = attr.field(converter=optional(as_aware_datetime))
- start_deadline: Optional[datetime] = attr.field(converter=optional(as_aware_datetime))
-
+class JobAcquired(WorkerEvent):
+ """Signals that a worker has acquired a job for processing."""
-@attr.define(kw_only=True, frozen=True)
-class JobStarted(JobExecutionEvent):
- """Signals that a worker has started processing a job."""
-
- @classmethod
- def from_job(cls, job: Job, start_time: datetime) -> JobStarted:
- return JobStarted(
- timestamp=start_time, job_id=job.id, task_id=job.task_id,
- schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time,
- start_deadline=job.start_deadline)
+ job_id: UUID = attr.field(converter=as_uuid)
+ worker_id: str
@attr.define(kw_only=True, frozen=True)
-class JobEnded(JobExecutionEvent):
+class JobReleased(WorkerEvent):
"""Signals that a worker has finished processing of a job."""
+ job_id: UUID = attr.field(converter=as_uuid)
+ worker_id: str
outcome: JobOutcome
- start_time: datetime = attr.field(converter=as_aware_datetime)
-
- @classmethod
- def from_job(cls, job: Job, outcome: JobOutcome, start_time: datetime) -> JobEnded:
- return JobEnded(
- timestamp=datetime.now(timezone.utc), job_id=job.id, task_id=job.task_id,
- schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time,
- start_deadline=job.start_deadline, outcome=outcome, start_time=start_time)
diff --git a/src/apscheduler/exceptions.py b/src/apscheduler/exceptions.py
index 1f90ba0..b84e58a 100644
--- a/src/apscheduler/exceptions.py
+++ b/src/apscheduler/exceptions.py
@@ -1,3 +1,6 @@
+from uuid import UUID
+
+
class TaskLookupError(LookupError):
"""Raised by a data store when it cannot find the requested task."""
@@ -8,8 +11,23 @@ class TaskLookupError(LookupError):
class JobLookupError(KeyError):
"""Raised when the job store cannot find a job for update or removal."""
- def __init__(self, job_id):
- super().__init__(u'No job by the id of %s was found' % job_id)
+ def __init__(self, job_id: UUID):
+ super().__init__(f'No job by the id of {job_id} was found')
+
+
+class JobResultNotReady(KeyError):
+ """Raised by ``get_job_result()`` if the job result is not ready."""
+
+ def __init__(self, job_id: UUID):
+ super().__init__(f'No job by the id of {job_id} was found')
+
+
+class JobCancelled(Exception):
+ """Raised by ``run_job()`` if the job was cancelled."""
+
+
+class JobDeadlineMissed(Exception):
+ """Raised by ``run_job()`` if the job failed to start within the allotted time."""
class ConflictingIdError(KeyError):
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 213b4de..6900460 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -6,7 +6,7 @@ from contextlib import AsyncExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
from typing import Any, Callable, Iterable, Mapping, Optional
-from uuid import uuid4
+from uuid import UUID, uuid4
import anyio
from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after
@@ -15,11 +15,13 @@ from anyio.abc import TaskGroup
from ..abc import AsyncDataStore, DataStore, EventSource, Job, Schedule, Trigger
from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..datastores.memory import MemoryDataStore
-from ..enums import CoalescePolicy, ConflictPolicy, RunState
+from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
-from ..events import ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated
+from ..events import (
+ JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
-from ..structures import Task
+from ..structures import JobResult, Task
from ..workers.async_ import AsyncWorker
@@ -121,6 +123,92 @@ class AsyncScheduler:
async def remove_schedule(self, schedule_id: str) -> None:
await self.data_store.remove_schedules({schedule_id})
+ async def add_job(
+ self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None,
+ kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = None
+ ) -> UUID:
+ """
+ Add a job to the data store.
+
+ :param func_or_task_id:
+ :param args: positional arguments to call the target callable with
+ :param kwargs: keyword arguments to call the target callable with
+ :param tags:
+ :return: the ID of the newly created job
+
+ """
+ if callable(func_or_task_id):
+ task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id)
+ await self.data_store.add_task(task)
+ else:
+ task = await self.data_store.get_task(func_or_task_id)
+
+ job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags)
+ await self.data_store.add_job(job)
+ return job.id
+
+ async def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult:
+ """
+ Retrieve the result of a job.
+
+ :param job_id: the ID of the job
+ :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to
+ raise an exception if the result is not yet available
+ :raises JobLookupError: if the job does not exist in the data store
+
+ """
+ wait_event = anyio.Event()
+
+ def listener(event: JobReleased) -> None:
+ if event.job_id == job_id:
+ wait_event.set()
+
+ with self.data_store.events.subscribe(listener, {JobReleased}):
+ result = await self.data_store.get_job_result(job_id)
+ if result:
+ return result
+ elif not wait:
+ raise JobLookupError(job_id)
+
+ await wait_event.wait()
+
+ result = await self.data_store.get_job_result(job_id)
+ assert isinstance(result, JobResult)
+ return result
+
+ async def run_job(
+ self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None,
+ kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = ()
+ ) -> Any:
+ """
+ Convenience method to add a job and then return its result (or raise its exception).
+
+ :returns: the return value of the target function
+
+ """
+ job_complete_event = anyio.Event()
+
+ def listener(event: JobReleased) -> None:
+ if event.job_id == job_id:
+ job_complete_event.set()
+
+ job_id: Optional[UUID] = None
+ with self.data_store.events.subscribe(listener, {JobReleased}):
+ job_id = await self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ await job_complete_event.wait()
+
+ result = await self.get_job_result(job_id)
+ if result.outcome is JobOutcome.success:
+ return result.return_value
+ elif result.outcome is JobOutcome.error:
+ raise result.exception
+ elif result.outcome is JobOutcome.missed_start_deadline:
+ raise JobDeadlineMissed
+ elif result.outcome is JobOutcome.cancelled:
+ raise JobCancelled
+ else:
+ raise RuntimeError(f'Unknown job outcome: {result.outcome}')
+
async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the scheduler is in the '
@@ -192,17 +280,3 @@ class AsyncScheduler:
self._state = RunState.stopped
await self._events.publish(SchedulerStopped())
-
- # async def stop(self, force: bool = False) -> None:
- # self._running = False
- # if self._worker:
- # await self._worker.stop(force)
- #
- # if self._acquire_cancel_scope:
- # self._acquire_cancel_scope.cancel()
- # if force and self._task_group:
- # self._task_group.cancel_scope.cancel()
- #
- # async def wait_until_stopped(self) -> None:
- # if self._stop_event:
- # await self._stop_event.wait()
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 221b284..1525bea 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -8,15 +8,17 @@ from contextlib import ExitStack
from datetime import datetime, timedelta, timezone
from logging import Logger, getLogger
from typing import Any, Callable, Iterable, Mapping, Optional
-from uuid import uuid4
+from uuid import UUID, uuid4
from ..abc import DataStore, EventSource, Trigger
from ..datastores.memory import MemoryDataStore
-from ..enums import CoalescePolicy, ConflictPolicy, RunState
+from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
-from ..events import Event, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated
+from ..events import (
+ Event, JobReleased, ScheduleAdded, SchedulerStarted, SchedulerStopped, ScheduleUpdated)
+from ..exceptions import JobCancelled, JobDeadlineMissed, JobLookupError
from ..marshalling import callable_to_ref
-from ..structures import Job, Schedule, Task
+from ..structures import Job, JobResult, Schedule, Task
from ..workers.sync import Worker
@@ -121,6 +123,92 @@ class Scheduler:
def remove_schedule(self, schedule_id: str) -> None:
self.data_store.remove_schedules({schedule_id})
+ def add_job(
+ self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None,
+ kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = None
+ ) -> UUID:
+ """
+ Add a job to the data store.
+
+ :param func_or_task_id:
+ :param args: positional arguments to call the target callable with
+ :param kwargs: keyword arguments to call the target callable with
+ :param tags:
+ :return: the ID of the newly created job
+
+ """
+ if callable(func_or_task_id):
+ task = Task(id=callable_to_ref(func_or_task_id), func=func_or_task_id)
+ self.data_store.add_task(task)
+ else:
+ task = self.data_store.get_task(func_or_task_id)
+
+ job = Job(task_id=task.id, args=args, kwargs=kwargs, tags=tags)
+ self.data_store.add_job(job)
+ return job.id
+
+ def get_job_result(self, job_id: UUID, *, wait: bool = True) -> JobResult:
+ """
+ Retrieve the result of a job.
+
+ :param job_id: the ID of the job
+ :param wait: if ``True``, wait until the job has ended (one way or another), ``False`` to
+ raise an exception if the result is not yet available
+ :raises JobLookupError: if the job does not exist in the data store
+
+ """
+ wait_event = threading.Event()
+
+ def listener(event: JobReleased) -> None:
+ if event.job_id == job_id:
+ wait_event.set()
+
+ with self.data_store.events.subscribe(listener, {JobReleased}):
+ result = self.data_store.get_job_result(job_id)
+ if result:
+ return result
+ elif not wait:
+ raise JobLookupError(job_id)
+
+ wait_event.wait()
+
+ result = self.data_store.get_job_result(job_id)
+ assert isinstance(result, JobResult)
+ return result
+
+ def run_job(
+ self, func_or_task_id: str | Callable, *, args: Optional[Iterable] = None,
+ kwargs: Optional[Mapping[str, Any]] = None, tags: Optional[Iterable[str]] = ()
+ ) -> Any:
+ """
+ Convenience method to add a job and then return its result (or raise its exception).
+
+ :returns: the return value of the target function
+
+ """
+ job_complete_event = threading.Event()
+
+ def listener(event: JobReleased) -> None:
+ if event.job_id == job_id:
+ job_complete_event.set()
+
+ job_id: Optional[UUID] = None
+ with self.data_store.events.subscribe(listener, {JobReleased}):
+ job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ job_complete_event.wait()
+
+ result = self.get_job_result(job_id)
+ if result.outcome is JobOutcome.success:
+ return result.return_value
+ elif result.outcome is JobOutcome.error:
+ raise result.exception
+ elif result.outcome is JobOutcome.missed_start_deadline:
+ raise JobDeadlineMissed
+ elif result.outcome is JobOutcome.cancelled:
+ raise JobCancelled
+ else:
+ raise RuntimeError(f'Unknown job outcome: {result.outcome}')
+
def run(self) -> None:
if self._state is not RunState.starting:
raise RuntimeError(f'This function cannot be called while the scheduler is in the '
diff --git a/src/apscheduler/structures.py b/src/apscheduler/structures.py
index 4d1bc18..c805ef0 100644
--- a/src/apscheduler/structures.py
+++ b/src/apscheduler/structures.py
@@ -6,6 +6,7 @@ from typing import Any, Callable, Optional
from uuid import UUID, uuid4
import attr
+from attr.converters import default_if_none
from . import abc
from .enums import CoalescePolicy, JobOutcome
@@ -89,12 +90,14 @@ class Schedule:
class Job:
id: UUID = attr.field(factory=uuid4)
task_id: str = attr.field(eq=False, order=False)
- args: tuple = attr.field(eq=False, order=False, default=())
- kwargs: dict[str, Any] = attr.field(eq=False, order=False, factory=dict)
+ args: tuple = attr.field(eq=False, order=False, converter=default_if_none(()))
+ kwargs: dict[str, Any] = attr.field(
+ eq=False, order=False, converter=default_if_none(factory=dict))
schedule_id: Optional[str] = attr.field(eq=False, order=False, default=None)
scheduled_fire_time: Optional[datetime] = attr.field(eq=False, order=False, default=None)
start_deadline: Optional[datetime] = attr.field(eq=False, order=False, default=None)
- tags: frozenset[str] = attr.field(eq=False, order=False, factory=frozenset)
+ tags: frozenset[str] = attr.field(
+ eq=False, order=False, converter=default_if_none(factory=frozenset))
created_at: datetime = attr.field(eq=False, order=False,
factory=partial(datetime.now, timezone.utc))
started_at: Optional[datetime] = attr.field(eq=False, order=False, default=None)
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index 5e872cf..dfa81dc 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -17,7 +17,7 @@ from ..abc import AsyncDataStore, DataStore, EventSource, Job
from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..enums import JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
-from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped
+from ..events import JobAdded, WorkerStarted, WorkerStopped
from ..structures import JobResult
@@ -122,11 +122,10 @@ class AsyncWorker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- await self._events.publish(
- JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time))
+ result = JobResult(job_id=job.id, outcome=JobOutcome.missed_start_deadline)
+ await self.data_store.release_job(self.identity, job.task_id, result)
return
- await self._events.publish(JobStarted.from_job(job, start_time))
try:
retval = func(*job.args, **job.kwargs)
if isawaitable(retval):
@@ -135,31 +134,13 @@ class AsyncWorker:
with CancelScope(shield=True):
result = JobResult(job_id=job.id, outcome=JobOutcome.cancelled)
await self.data_store.release_job(self.identity, job.task_id, result)
-
- with move_on_after(1, shield=True):
- await self._events.publish(
- JobEnded.from_job(job, JobOutcome.cancelled, start_time))
except BaseException as exc:
result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc)
await self.data_store.release_job(self.identity, job.task_id, result)
- await self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time))
if not isinstance(exc, Exception):
raise
else:
result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval)
await self.data_store.release_job(self.identity, job.task_id, result)
- await self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time))
finally:
self._running_jobs.remove(job.id)
-
- # async def stop(self, force: bool = False) -> None:
- # self._running = False
- # if self._acquire_cancel_scope:
- # self._acquire_cancel_scope.cancel()
- #
- # if force and self._task_group:
- # self._task_group.cancel_scope.cancel()
- #
- # async def wait_until_stopped(self) -> None:
- # if self._stop_event:
- # await self._stop_event.wait()
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index be805ee..48922da 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -13,7 +13,7 @@ from uuid import UUID
from ..abc import DataStore, EventSource
from ..enums import JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
-from ..events import JobAdded, JobEnded, JobStarted, WorkerStarted, WorkerStopped
+from ..events import JobAdded, WorkerStarted, WorkerStopped
from ..structures import Job, JobResult
@@ -118,22 +118,19 @@ class Worker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- self._events.publish(
- JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time))
+ result = JobResult(job_id=job.id, outcome=JobOutcome.missed_start_deadline)
+ self.data_store.release_job(self.identity, job.task_id, result)
return
- self._events.publish(JobStarted.from_job(job, start_time))
try:
retval = func(*job.args, **job.kwargs)
except BaseException as exc:
result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc)
self.data_store.release_job(self.identity, job.task_id, result)
- self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time))
if not isinstance(exc, Exception):
raise
else:
result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval)
self.data_store.release_job(self.identity, job.task_id, result)
- self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time))
finally:
self._running_jobs.remove(job.id)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 6a45b8b..5f1a7df 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -1,12 +1,15 @@
import threading
+import time
from datetime import datetime, timezone
import anyio
import pytest
from anyio import fail_after
+from apscheduler.enums import JobOutcome
from apscheduler.events import (
Event, JobAdded, ScheduleAdded, ScheduleRemoved, SchedulerStarted, SchedulerStopped, TaskAdded)
+from apscheduler.exceptions import JobLookupError
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.date import DateTrigger
@@ -14,12 +17,20 @@ from apscheduler.triggers.date import DateTrigger
pytestmark = pytest.mark.anyio
-async def dummy_async_job():
- return 'returnvalue'
+async def dummy_async_job(delay: float = 0, fail: bool = False) -> str:
+ await anyio.sleep(delay)
+ if fail:
+ raise RuntimeError('failing as requested')
+ else:
+ return 'returnvalue'
-def dummy_sync_job():
- return 'returnvalue'
+def dummy_sync_job(delay: float = 0, fail: bool = False) -> str:
+ time.sleep(delay)
+ if fail:
+ raise RuntimeError('failing as requested')
+ else:
+ return 'returnvalue'
class TestAsyncScheduler:
@@ -72,6 +83,39 @@ class TestAsyncScheduler:
# There should be no more events on the list
assert not received_events
+ async def test_get_job_result_success(self) -> None:
+ async with AsyncScheduler() as scheduler:
+ job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2})
+ result = await scheduler.get_job_result(job_id)
+ assert result.job_id == job_id
+ assert result.outcome is JobOutcome.success
+ assert result.return_value == 'returnvalue'
+
+ async def test_get_job_result_error(self) -> None:
+ async with AsyncScheduler() as scheduler:
+ job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2, 'fail': True})
+ result = await scheduler.get_job_result(job_id)
+ assert result.job_id == job_id
+ assert result.outcome is JobOutcome.error
+ assert isinstance(result.exception, RuntimeError)
+ assert str(result.exception) == 'failing as requested'
+
+ async def test_get_job_result_nowait_not_yet_ready(self) -> None:
+ async with AsyncScheduler() as scheduler:
+ job_id = await scheduler.add_job(dummy_async_job, kwargs={'delay': 0.2})
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
+ async def test_run_job_success(self) -> None:
+ async with AsyncScheduler() as scheduler:
+ return_value = await scheduler.run_job(dummy_async_job)
+ assert return_value == 'returnvalue'
+
+ async def test_run_job_failure(self) -> None:
+ async with AsyncScheduler() as scheduler:
+ with pytest.raises(RuntimeError, match='failing as requested'):
+ await scheduler.run_job(dummy_async_job, kwargs={'fail': True})
+
class TestSyncScheduler:
def test_schedule_job(self):
@@ -102,7 +146,6 @@ class TestSyncScheduler:
received_event = received_events.pop(0)
assert isinstance(received_event, ScheduleAdded)
assert received_event.schedule_id == 'foo'
- # assert received_event.task_id == 'task_id'
# Then that schedule was processed and a job was added for it
received_event = received_events.pop(0)
@@ -121,3 +164,35 @@ class TestSyncScheduler:
# There should be no more events on the list
assert not received_events
+
+ def test_get_job_result(self) -> None:
+ with Scheduler() as scheduler:
+ job_id = scheduler.add_job(dummy_sync_job)
+ result = scheduler.get_job_result(job_id)
+ assert result.outcome is JobOutcome.success
+ assert result.return_value == 'returnvalue'
+
+ def test_get_job_result_error(self) -> None:
+ with Scheduler() as scheduler:
+ job_id = scheduler.add_job(dummy_sync_job, kwargs={'delay': 0.2, 'fail': True})
+ result = scheduler.get_job_result(job_id)
+ assert result.job_id == job_id
+ assert result.outcome is JobOutcome.error
+ assert isinstance(result.exception, RuntimeError)
+ assert str(result.exception) == 'failing as requested'
+
+ def test_get_job_result_nowait_not_yet_ready(self) -> None:
+ with Scheduler() as scheduler:
+ job_id = scheduler.add_job(dummy_sync_job, kwargs={'delay': 0.2})
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
+ def test_run_job_success(self) -> None:
+ with Scheduler() as scheduler:
+ return_value = scheduler.run_job(dummy_sync_job)
+ assert return_value == 'returnvalue'
+
+ def test_run_job_failure(self) -> None:
+ with Scheduler() as scheduler:
+ with pytest.raises(RuntimeError, match='failing as requested'):
+ scheduler.run_job(dummy_sync_job, kwargs={'fail': True})
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 74a018c..872cf34 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -10,7 +10,7 @@ from apscheduler.abc import Job
from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.enums import JobOutcome
from apscheduler.events import (
- Event, JobAdded, JobEnded, JobStarted, TaskAdded, WorkerStarted, WorkerStopped)
+ Event, JobAcquired, JobAdded, JobReleased, TaskAdded, WorkerStarted, WorkerStopped)
from apscheduler.structures import Task
from apscheduler.workers.async_ import AsyncWorker
from apscheduler.workers.sync import Worker
@@ -75,7 +75,7 @@ class TestAsyncWorker:
# Then the job was started
received_event = received_events.pop(0)
- assert isinstance(received_event, JobStarted)
+ assert isinstance(received_event, JobAcquired)
assert received_event.job_id == job.id
assert received_event.task_id == 'task_id'
assert received_event.schedule_id is None
@@ -83,11 +83,11 @@ class TestAsyncWorker:
received_event = received_events.pop(0)
if fail:
# Then the job failed
- assert isinstance(received_event, JobEnded)
+ assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.error
else:
# Then the job finished successfully
- assert isinstance(received_event, JobEnded)
+ assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.success
# Finally, the worker was stopped
@@ -136,7 +136,7 @@ class TestAsyncWorker:
# Then the deadline was missed
received_event = received_events.pop(0)
- assert isinstance(received_event, JobEnded)
+ assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.missed_start_deadline
assert received_event.job_id == job.id
assert received_event.task_id == 'task_id'
@@ -187,7 +187,7 @@ class TestSyncWorker:
# Then the job was started
received_event = received_events.pop(0)
- assert isinstance(received_event, JobStarted)
+ assert isinstance(received_event, JobAcquired)
assert received_event.job_id == job.id
assert received_event.task_id == 'task_id'
assert received_event.schedule_id is None
@@ -195,11 +195,11 @@ class TestSyncWorker:
received_event = received_events.pop(0)
if fail:
# Then the job failed
- assert isinstance(received_event, JobEnded)
+ assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.error
else:
# Then the job finished successfully
- assert isinstance(received_event, JobEnded)
+ assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.success
# Finally, the worker was stopped
@@ -247,7 +247,7 @@ class TestSyncWorker:
# Then the deadline was missed
received_event = received_events.pop(0)
- assert isinstance(received_event, JobEnded)
+ assert isinstance(received_event, JobReleased)
assert received_event.outcome is JobOutcome.missed_start_deadline
assert received_event.job_id == job.id
assert received_event.task_id == 'task_id'