summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/apscheduler/_structures.py27
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py11
-rw-r--r--src/apscheduler/datastores/memory.py23
-rw-r--r--src/apscheduler/datastores/mongodb.py10
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py17
-rw-r--r--src/apscheduler/schedulers/async_.py20
-rw-r--r--src/apscheduler/schedulers/sync.py20
-rw-r--r--src/apscheduler/workers/async_.py62
-rw-r--r--src/apscheduler/workers/sync.py46
-rw-r--r--tests/test_datastores.py73
-rw-r--r--tests/test_schedulers.py63
11 files changed, 273 insertions, 99 deletions
diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py
index 6ceaaf6..ba3359b 100644
--- a/src/apscheduler/_structures.py
+++ b/src/apscheduler/_structures.py
@@ -158,6 +158,8 @@ class Job:
(if the job was derived from a schedule)
:param start_deadline: if the job is started in the worker after this time, it is
considered to be misfired and will be aborted
+ :param result_expiration_time: minimum amount of time to keep the result available
+ for fetching in the data store
:param tags: strings that can be used to categorize and filter the job
:param created_at: the time at which the job was created
:param started_at: the time at which the execution of the job was started
@@ -181,6 +183,9 @@ class Job:
eq=False, order=False, converter=as_timedelta, factory=timedelta
)
start_deadline: datetime | None = attrs.field(eq=False, order=False, default=None)
+ result_expiration_time: timedelta = attrs.field(
+ eq=False, order=False, converter=as_timedelta, default=timedelta()
+ )
tags: frozenset[str] = attrs.field(
eq=False, order=False, converter=frozenset, default=()
)
@@ -277,9 +282,31 @@ class JobResult:
finished_at: datetime = attrs.field(
eq=False, order=False, factory=partial(datetime.now, timezone.utc)
)
+ expires_at: datetime = attrs.field(eq=False, order=False)
exception: BaseException | None = attrs.field(eq=False, order=False, default=None)
return_value: Any = attrs.field(eq=False, order=False, default=None)
+ @classmethod
+ def from_job(
+ cls,
+ job: Job,
+ outcome: JobOutcome,
+ *,
+ finished_at: datetime | None = None,
+ exception: BaseException | None = None,
+ return_value: Any = None,
+ ) -> JobResult:
+ real_finished_at = finished_at or datetime.now(timezone.utc)
+ expires_at = real_finished_at + job.result_expiration_time
+ return cls(
+ job_id=job.id,
+ outcome=outcome,
+ finished_at=real_finished_at,
+ expires_at=expires_at,
+ exception=exception,
+ return_value=return_value,
+ )
+
def marshal(self, serializer: Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
if self.outcome is JobOutcome.error:
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index 056d282..74742f8 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -557,12 +557,13 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseAsyncDataStore):
async for attempt in self._retry():
with attempt:
async with self.engine.begin() as conn:
- # Insert the job result
- marshalled = result.marshal(self.serializer)
- insert = self.t_job_results.insert().values(**marshalled)
- await conn.execute(insert)
+ # Record the job result
+ if result.expires_at > result.finished_at:
+ marshalled = result.marshal(self.serializer)
+ insert = self.t_job_results.insert().values(**marshalled)
+ await conn.execute(insert)
- # Decrement the running jobs counter
+ # Decrement the number of running jobs for this task
update = (
self.t_tasks.update()
.values(running_jobs=self.t_tasks.c.running_jobs - 1)
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py
index 2c96095..fb29ad9 100644
--- a/src/apscheduler/datastores/memory.py
+++ b/src/apscheduler/datastores/memory.py
@@ -13,7 +13,6 @@ from .._enums import ConflictPolicy
from .._events import (
JobAcquired,
JobAdded,
- JobReleased,
ScheduleAdded,
ScheduleRemoved,
ScheduleUpdated,
@@ -296,26 +295,20 @@ class MemoryDataStore(BaseDataStore):
return jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
- # Delete the job
- job_state = self._jobs_by_id.pop(result.job_id)
- self._jobs_by_task_id[task_id].remove(job_state)
- index = self._find_job_index(job_state)
- del self._jobs[index]
+ # Record the job result
+ if result.expires_at > result.finished_at:
+ self._job_results[result.job_id] = result
# Decrement the number of running jobs for this task
task_state = self._tasks.get(task_id)
if task_state is not None:
task_state.running_jobs -= 1
- # Record the result
- self._job_results[result.job_id] = result
-
- # Publish the event
- self._events.publish(
- JobReleased(
- job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
- )
- )
+ # Delete the job
+ job_state = self._jobs_by_id.pop(result.job_id)
+ self._jobs_by_task_id[task_id].remove(job_state)
+ index = self._find_job_index(job_state)
+ del self._jobs[index]
def get_job_result(self, job_id: UUID) -> JobResult | None:
return self._job_results.pop(job_id, None)
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 1fb7d96..de209ba 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -158,6 +158,7 @@ class MongoDBDataStore(BaseDataStore):
self._jobs.create_index("created_at", session=session)
self._jobs.create_index("tags", session=session)
self._jobs_results.create_index("finished_at", session=session)
+ self._jobs_results.create_index("expires_at", session=session)
def add_task(self, task: Task) -> None:
for attempt in self._retry():
@@ -495,10 +496,11 @@ class MongoDBDataStore(BaseDataStore):
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
for attempt in self._retry():
with attempt, self.client.start_session() as session:
- # Insert the job result
- document = result.marshal(self.serializer)
- document["_id"] = document.pop("job_id")
- self._jobs_results.insert_one(document, session=session)
+ # Record the job result
+ if result.expires_at > result.finished_at:
+ document = result.marshal(self.serializer)
+ document["_id"] = document.pop("job_id")
+ self._jobs_results.insert_one(document, session=session)
# Decrement the running jobs counter
self._tasks.find_one_and_update(
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 01e31bb..9c7c905 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -37,7 +37,6 @@ from .._events import (
JobAcquired,
JobAdded,
JobDeserializationFailed,
- JobReleased,
ScheduleAdded,
ScheduleDeserializationFailed,
ScheduleRemoved,
@@ -178,6 +177,7 @@ class _BaseSQLAlchemyDataStore:
Column("scheduled_fire_time", timestamp_type),
Column("jitter", interval_type),
Column("start_deadline", timestamp_type),
+ Column("result_expiration_time", interval_type),
Column("tags", tags_type, nullable=False),
Column("created_at", timestamp_type, nullable=False),
Column("started_at", timestamp_type),
@@ -190,6 +190,7 @@ class _BaseSQLAlchemyDataStore:
Column("job_id", job_id_type, primary_key=True),
Column("outcome", Enum(JobOutcome), nullable=False),
Column("finished_at", timestamp_type, index=True),
+ Column("expires_at", timestamp_type, nullable=False, index=True),
Column("exception", LargeBinary),
Column("return_value", LargeBinary),
)
@@ -672,9 +673,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore):
for attempt in self._retry():
with attempt, self.engine.begin() as conn:
# Insert the job result
- marshalled = result.marshal(self.serializer)
- insert = self.t_job_results.insert().values(**marshalled)
- conn.execute(insert)
+ if result.expires_at > result.finished_at:
+ marshalled = result.marshal(self.serializer)
+ insert = self.t_job_results.insert().values(**marshalled)
+ conn.execute(insert)
# Decrement the running jobs counter
update = (
@@ -688,13 +690,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore):
delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id)
conn.execute(delete)
- # Publish the event
- self._events.publish(
- JobReleased(
- job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
- )
- )
-
def get_job_result(self, job_id: UUID) -> JobResult | None:
for attempt in self._retry():
with attempt, self.engine.begin() as conn:
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 6c60bdd..48afe88 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -202,6 +202,7 @@ class AsyncScheduler:
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
tags: Iterable[str] | None = None,
+ result_expiration_time: timedelta | float = 0,
) -> UUID:
"""
Add a job to the data store.
@@ -209,7 +210,10 @@ class AsyncScheduler:
:param func_or_task_id:
:param args: positional arguments to call the target callable with
:param kwargs: keyword arguments to call the target callable with
- :param tags:
+ :param tags: strings that can be used to categorize and filter the job
+ :param result_expiration_time: the minimum time (as seconds, or timedelta) to
+ keep the result of the job available for fetching (the result won't be
+ saved at all if that time is 0)
:return: the ID of the newly created job
"""
@@ -224,6 +228,7 @@ class AsyncScheduler:
args=args or (),
kwargs=kwargs or {},
tags=tags or frozenset(),
+ result_expiration_time=result_expiration_time,
)
await self.data_store.add_job(job)
return job.id
@@ -235,7 +240,8 @@ class AsyncScheduler:
:param job_id: the ID of the job
:param wait: if ``True``, wait until the job has ended (one way or another),
``False`` to raise an exception if the result is not yet available
- :raises JobLookupError: if the job does not exist in the data store
+ :raises JobLookupError: if ``wait=False`` and the job result does not exist in
+ the data store
"""
wait_event = anyio.Event()
@@ -253,9 +259,7 @@ class AsyncScheduler:
await wait_event.wait()
- result = await self.data_store.get_job_result(job_id)
- assert isinstance(result, JobResult)
- return result
+ return await self.data_store.get_job_result(job_id)
async def run_job(
self,
@@ -287,7 +291,11 @@ class AsyncScheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
job_id = await self.add_job(
- func_or_task_id, args=args, kwargs=kwargs, tags=tags
+ func_or_task_id,
+ args=args,
+ kwargs=kwargs,
+ tags=tags,
+ result_expiration_time=timedelta(minutes=15),
)
await job_complete_event.wait()
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index aea7047..7ed6d3f 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -234,6 +234,7 @@ class Scheduler:
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
tags: Iterable[str] | None = None,
+ result_expiration_time: timedelta | float = 0,
) -> UUID:
"""
Add a job to the data store.
@@ -243,6 +244,9 @@ class Scheduler:
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
:param tags: strings that can be used to categorize and filter the job
+ :param result_expiration_time: the minimum time (as seconds, or timedelta) to
+ keep the result of the job available for fetching (the result won't be
+ saved at all if that time is 0)
:return: the ID of the newly created job
"""
@@ -258,6 +262,7 @@ class Scheduler:
args=args or (),
kwargs=kwargs or {},
tags=tags or frozenset(),
+ result_expiration_time=result_expiration_time,
)
self.data_store.add_job(job)
return job.id
@@ -269,7 +274,8 @@ class Scheduler:
:param job_id: the ID of the job
:param wait: if ``True``, wait until the job has ended (one way or another),
``False`` to raise an exception if the result is not yet available
- :raises JobLookupError: if the job does not exist in the data store
+ :raises JobLookupError: if ``wait=False`` and the job result does not exist in
+ the data store
"""
self._ensure_services_ready()
@@ -288,9 +294,7 @@ class Scheduler:
wait_event.wait()
- result = self.data_store.get_job_result(job_id)
- assert isinstance(result, JobResult)
- return result
+ return self.data_store.get_job_result(job_id)
def run_job(
self,
@@ -322,7 +326,13 @@ class Scheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
- job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ job_id = self.add_job(
+ func_or_task_id,
+ args=args,
+ kwargs=kwargs,
+ tags=tags,
+ result_expiration_time=timedelta(minutes=15),
+ )
job_complete_event.wait()
result = self.get_job_result(job_id)
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index a44c89c..b04eea1 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -23,7 +23,7 @@ from anyio.abc import CancelScope, TaskGroup
from .._context import current_job, current_worker
from .._converters import as_async_datastore, as_async_eventbroker
from .._enums import JobOutcome, RunState
-from .._events import JobAdded, WorkerStarted, WorkerStopped
+from .._events import JobAdded, JobReleased, WorkerStarted, WorkerStopped
from .._structures import Job, JobInfo, JobResult
from .._validators import positive_integer
from ..abc import AsyncDataStore, AsyncEventBroker
@@ -172,10 +172,17 @@ class AsyncWorker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.missed_start_deadline
+ result = JobResult.from_job(
+ job,
+ outcome=JobOutcome.missed_start_deadline,
+ finished_at=start_time,
)
await self.data_store.release_job(self.identity, job.task_id, result)
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
+ )
return
token = current_job.set(JobInfo.from_job(job))
@@ -184,23 +191,60 @@ class AsyncWorker:
if isawaitable(retval):
retval = await retval
except get_cancelled_exc_class():
+ self.logger.info("Job %s was cancelled", job.id)
with CancelScope(shield=True):
- result = JobResult(job_id=job.id, outcome=JobOutcome.cancelled)
+ result = JobResult.from_job(
+ job,
+ outcome=JobOutcome.cancelled,
+ )
await self.data_store.release_job(
self.identity, job.task_id, result
)
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id,
+ worker_id=self.identity,
+ outcome=result.outcome,
+ )
+ )
except BaseException as exc:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.error, exception=exc
+ if isinstance(exc, Exception):
+ self.logger.exception("Job %s raised an exception", job.id)
+ else:
+ self.logger.error(
+ "Job %s was aborted due to %s", job.id, exc.__class__.__name__
+ )
+
+ result = JobResult.from_job(
+ job,
+ JobOutcome.error,
+ exception=exc,
+ )
+ await self.data_store.release_job(
+ self.identity,
+ job.task_id,
+ result,
+ )
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
)
- await self.data_store.release_job(self.identity, job.task_id, result)
if not isinstance(exc, Exception):
raise
else:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.success, return_value=retval
+ self.logger.info("Job %s completed successfully", job.id)
+ result = JobResult.from_job(
+ job,
+ JobOutcome.success,
+ return_value=retval,
)
await self.data_store.release_job(self.identity, job.task_id, result)
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
+ )
finally:
current_job.reset(token)
finally:
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 1d8f85a..8c627c6 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -15,6 +15,7 @@ from uuid import UUID
import attrs
+from .. import JobReleased
from .._context import current_job, current_worker
from .._enums import JobOutcome, RunState
from .._events import JobAdded, WorkerStarted, WorkerStopped
@@ -200,8 +201,13 @@ class Worker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.missed_start_deadline
+ result = JobResult.from_job(
+ job, JobOutcome.missed_start_deadline, finished_at=start_time
+ )
+ self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
)
self.data_store.release_job(self.identity, job.task_id, result)
return
@@ -210,17 +216,43 @@ class Worker:
try:
retval = func(*job.args, **job.kwargs)
except BaseException as exc:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.error, exception=exc
+ if isinstance(exc, Exception):
+ self.logger.exception("Job %s raised an exception", job.id)
+ else:
+ self.logger.error(
+ "Job %s was aborted due to %s", job.id, exc.__class__.__name__
+ )
+
+ result = JobResult.from_job(
+ job,
+ JobOutcome.error,
+ exception=exc,
+ )
+ self.data_store.release_job(
+ self.identity,
+ job.task_id,
+ result,
+ )
+ self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
)
- self.data_store.release_job(self.identity, job.task_id, result)
if not isinstance(exc, Exception):
raise
else:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.success, return_value=retval
+ self.logger.info("Job %s completed successfully", job.id)
+ result = JobResult.from_job(
+ job,
+ JobOutcome.success,
+ return_value=retval,
)
self.data_store.release_job(self.identity, job.task_id, result)
+ self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
+ )
finally:
current_job.reset(token)
finally:
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 22722c7..40fad27 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import threading
from collections.abc import Generator
from contextlib import asynccontextmanager, contextmanager
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from tempfile import TemporaryDirectory
from typing import Any, AsyncGenerator, cast
@@ -433,7 +433,7 @@ class TestDataStores:
def test_job_release_success(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -443,9 +443,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -460,7 +460,7 @@ class TestDataStores:
def test_job_release_failure(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -470,9 +470,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -488,7 +488,7 @@ class TestDataStores:
def test_job_release_missed_deadline(self, datastore: DataStore):
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -498,7 +498,10 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -511,7 +514,7 @@ class TestDataStores:
def test_job_release_cancelled(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker1", 2)
@@ -521,7 +524,10 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.cancelled,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -576,9 +582,9 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
@@ -890,7 +896,7 @@ class TestAsyncDataStores:
async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -900,9 +906,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -917,7 +923,7 @@ class TestAsyncDataStores:
async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -927,9 +933,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -945,7 +951,7 @@ class TestAsyncDataStores:
async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -955,7 +961,10 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -968,7 +977,7 @@ class TestAsyncDataStores:
async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker1", 2)
@@ -978,7 +987,7 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(acquired[0], JobOutcome.cancelled),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -998,7 +1007,7 @@ class TestAsyncDataStores:
"""
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
# First, one worker acquires the first available job
@@ -1035,9 +1044,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 32f28dc..f57cbeb 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -17,6 +17,7 @@ from apscheduler import (
JobAdded,
JobLookupError,
JobOutcome,
+ JobReleased,
Schedule,
ScheduleAdded,
ScheduleLookupError,
@@ -181,16 +182,33 @@ class TestAsyncScheduler:
async def test_get_job_result_success(self) -> None:
async with AsyncScheduler() as scheduler:
- job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
+ job_id = await scheduler.add_job(
+ dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5
+ )
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ async def test_get_job_result_success_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ )
+ job_id = await scheduler.add_job(dummy_async_job)
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_error(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(
- dummy_async_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_async_job,
+ kwargs={"delay": 0.2, "fail": True},
+ result_expiration_time=5,
)
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -198,6 +216,17 @@ class TestAsyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ async def test_get_job_result_error_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True)
+ job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_nowait_not_yet_ready(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
@@ -239,6 +268,7 @@ class TestAsyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
await scheduler.data_store.add_job(job)
result = await scheduler.get_job_result(job.id)
@@ -371,17 +401,29 @@ class TestSyncScheduler:
)
assert jobs[0].original_scheduled_time == orig_start_time
- def test_get_job_result(self) -> None:
+ def test_get_job_result_success(self) -> None:
with Scheduler() as scheduler:
- job_id = scheduler.add_job(dummy_sync_job)
+ job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5)
result = scheduler.get_job_result(job_id)
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ def test_get_job_result_success_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler:
+ with scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job)
+ event.wait(3)
+
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_error(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(
- dummy_sync_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5
)
result = scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -389,6 +431,16 @@ class TestSyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ def test_get_job_result_error_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler, scheduler.event_broker.subscribe(
+ lambda evt: event.set(), one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ event.wait(3)
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_nowait_not_yet_ready(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2})
@@ -428,6 +480,7 @@ class TestSyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
scheduler.data_store.add_job(job)
result = scheduler.get_job_result(job.id)