summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2022-08-13 23:06:08 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2022-08-13 23:06:08 +0300
commita9141bb5663e0a22cc7c4da7d34834c925dbadea (patch)
tree533ee4126b22d0b30b6c74106d7b4d3b89328757
parentffbbbbe0ee147bba08e0c95b3697136590997fb1 (diff)
downloadapscheduler-a9141bb5663e0a22cc7c4da7d34834c925dbadea.tar.gz
Added job expiration times
Scheduled jobs no longer retain their results. All job outcomes are now logged by the workers. Workers, rather than data stores, are now responsible for emitting the JobReleased event.
-rw-r--r--src/apscheduler/_structures.py27
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py11
-rw-r--r--src/apscheduler/datastores/memory.py23
-rw-r--r--src/apscheduler/datastores/mongodb.py10
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py17
-rw-r--r--src/apscheduler/schedulers/async_.py20
-rw-r--r--src/apscheduler/schedulers/sync.py20
-rw-r--r--src/apscheduler/workers/async_.py62
-rw-r--r--src/apscheduler/workers/sync.py46
-rw-r--r--tests/test_datastores.py73
-rw-r--r--tests/test_schedulers.py63
11 files changed, 273 insertions, 99 deletions
diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py
index 6ceaaf6..ba3359b 100644
--- a/src/apscheduler/_structures.py
+++ b/src/apscheduler/_structures.py
@@ -158,6 +158,8 @@ class Job:
(if the job was derived from a schedule)
:param start_deadline: if the job is started in the worker after this time, it is
considered to be misfired and will be aborted
+ :param result_expiration_time: minimum amount of time to keep the result available
+ for fetching in the data store
:param tags: strings that can be used to categorize and filter the job
:param created_at: the time at which the job was created
:param started_at: the time at which the execution of the job was started
@@ -181,6 +183,9 @@ class Job:
eq=False, order=False, converter=as_timedelta, factory=timedelta
)
start_deadline: datetime | None = attrs.field(eq=False, order=False, default=None)
+ result_expiration_time: timedelta = attrs.field(
+ eq=False, order=False, converter=as_timedelta, default=timedelta()
+ )
tags: frozenset[str] = attrs.field(
eq=False, order=False, converter=frozenset, default=()
)
@@ -277,9 +282,31 @@ class JobResult:
finished_at: datetime = attrs.field(
eq=False, order=False, factory=partial(datetime.now, timezone.utc)
)
+ expires_at: datetime = attrs.field(eq=False, order=False)
exception: BaseException | None = attrs.field(eq=False, order=False, default=None)
return_value: Any = attrs.field(eq=False, order=False, default=None)
+ @classmethod
+ def from_job(
+ cls,
+ job: Job,
+ outcome: JobOutcome,
+ *,
+ finished_at: datetime | None = None,
+ exception: BaseException | None = None,
+ return_value: Any = None,
+ ) -> JobResult:
+ real_finished_at = finished_at or datetime.now(timezone.utc)
+ expires_at = real_finished_at + job.result_expiration_time
+ return cls(
+ job_id=job.id,
+ outcome=outcome,
+ finished_at=real_finished_at,
+ expires_at=expires_at,
+ exception=exception,
+ return_value=return_value,
+ )
+
def marshal(self, serializer: Serializer) -> dict[str, Any]:
marshalled = attrs.asdict(self, value_serializer=serialize)
if self.outcome is JobOutcome.error:
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index 056d282..74742f8 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -557,12 +557,13 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseAsyncDataStore):
async for attempt in self._retry():
with attempt:
async with self.engine.begin() as conn:
- # Insert the job result
- marshalled = result.marshal(self.serializer)
- insert = self.t_job_results.insert().values(**marshalled)
- await conn.execute(insert)
+ # Record the job result
+ if result.expires_at > result.finished_at:
+ marshalled = result.marshal(self.serializer)
+ insert = self.t_job_results.insert().values(**marshalled)
+ await conn.execute(insert)
- # Decrement the running jobs counter
+ # Decrement the number of running jobs for this task
update = (
self.t_tasks.update()
.values(running_jobs=self.t_tasks.c.running_jobs - 1)
diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py
index 2c96095..fb29ad9 100644
--- a/src/apscheduler/datastores/memory.py
+++ b/src/apscheduler/datastores/memory.py
@@ -13,7 +13,6 @@ from .._enums import ConflictPolicy
from .._events import (
JobAcquired,
JobAdded,
- JobReleased,
ScheduleAdded,
ScheduleRemoved,
ScheduleUpdated,
@@ -296,26 +295,20 @@ class MemoryDataStore(BaseDataStore):
return jobs
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
- # Delete the job
- job_state = self._jobs_by_id.pop(result.job_id)
- self._jobs_by_task_id[task_id].remove(job_state)
- index = self._find_job_index(job_state)
- del self._jobs[index]
+ # Record the job result
+ if result.expires_at > result.finished_at:
+ self._job_results[result.job_id] = result
# Decrement the number of running jobs for this task
task_state = self._tasks.get(task_id)
if task_state is not None:
task_state.running_jobs -= 1
- # Record the result
- self._job_results[result.job_id] = result
-
- # Publish the event
- self._events.publish(
- JobReleased(
- job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
- )
- )
+ # Delete the job
+ job_state = self._jobs_by_id.pop(result.job_id)
+ self._jobs_by_task_id[task_id].remove(job_state)
+ index = self._find_job_index(job_state)
+ del self._jobs[index]
def get_job_result(self, job_id: UUID) -> JobResult | None:
return self._job_results.pop(job_id, None)
diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py
index 1fb7d96..de209ba 100644
--- a/src/apscheduler/datastores/mongodb.py
+++ b/src/apscheduler/datastores/mongodb.py
@@ -158,6 +158,7 @@ class MongoDBDataStore(BaseDataStore):
self._jobs.create_index("created_at", session=session)
self._jobs.create_index("tags", session=session)
self._jobs_results.create_index("finished_at", session=session)
+ self._jobs_results.create_index("expires_at", session=session)
def add_task(self, task: Task) -> None:
for attempt in self._retry():
@@ -495,10 +496,11 @@ class MongoDBDataStore(BaseDataStore):
def release_job(self, worker_id: str, task_id: str, result: JobResult) -> None:
for attempt in self._retry():
with attempt, self.client.start_session() as session:
- # Insert the job result
- document = result.marshal(self.serializer)
- document["_id"] = document.pop("job_id")
- self._jobs_results.insert_one(document, session=session)
+ # Record the job result
+ if result.expires_at > result.finished_at:
+ document = result.marshal(self.serializer)
+ document["_id"] = document.pop("job_id")
+ self._jobs_results.insert_one(document, session=session)
# Decrement the running jobs counter
self._tasks.find_one_and_update(
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 01e31bb..9c7c905 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -37,7 +37,6 @@ from .._events import (
JobAcquired,
JobAdded,
JobDeserializationFailed,
- JobReleased,
ScheduleAdded,
ScheduleDeserializationFailed,
ScheduleRemoved,
@@ -178,6 +177,7 @@ class _BaseSQLAlchemyDataStore:
Column("scheduled_fire_time", timestamp_type),
Column("jitter", interval_type),
Column("start_deadline", timestamp_type),
+ Column("result_expiration_time", interval_type),
Column("tags", tags_type, nullable=False),
Column("created_at", timestamp_type, nullable=False),
Column("started_at", timestamp_type),
@@ -190,6 +190,7 @@ class _BaseSQLAlchemyDataStore:
Column("job_id", job_id_type, primary_key=True),
Column("outcome", Enum(JobOutcome), nullable=False),
Column("finished_at", timestamp_type, index=True),
+ Column("expires_at", timestamp_type, nullable=False, index=True),
Column("exception", LargeBinary),
Column("return_value", LargeBinary),
)
@@ -672,9 +673,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore):
for attempt in self._retry():
with attempt, self.engine.begin() as conn:
# Insert the job result
- marshalled = result.marshal(self.serializer)
- insert = self.t_job_results.insert().values(**marshalled)
- conn.execute(insert)
+ if result.expires_at > result.finished_at:
+ marshalled = result.marshal(self.serializer)
+ insert = self.t_job_results.insert().values(**marshalled)
+ conn.execute(insert)
# Decrement the running jobs counter
update = (
@@ -688,13 +690,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore):
delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id)
conn.execute(delete)
- # Publish the event
- self._events.publish(
- JobReleased(
- job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
- )
- )
-
def get_job_result(self, job_id: UUID) -> JobResult | None:
for attempt in self._retry():
with attempt, self.engine.begin() as conn:
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 6c60bdd..48afe88 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -202,6 +202,7 @@ class AsyncScheduler:
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
tags: Iterable[str] | None = None,
+ result_expiration_time: timedelta | float = 0,
) -> UUID:
"""
Add a job to the data store.
@@ -209,7 +210,10 @@ class AsyncScheduler:
:param func_or_task_id:
:param args: positional arguments to call the target callable with
:param kwargs: keyword arguments to call the target callable with
- :param tags:
+ :param tags: strings that can be used to categorize and filter the job
+ :param result_expiration_time: the minimum time (as seconds, or timedelta) to
+ keep the result of the job available for fetching (the result won't be
+ saved at all if that time is 0)
:return: the ID of the newly created job
"""
@@ -224,6 +228,7 @@ class AsyncScheduler:
args=args or (),
kwargs=kwargs or {},
tags=tags or frozenset(),
+ result_expiration_time=result_expiration_time,
)
await self.data_store.add_job(job)
return job.id
@@ -235,7 +240,8 @@ class AsyncScheduler:
:param job_id: the ID of the job
:param wait: if ``True``, wait until the job has ended (one way or another),
``False`` to raise an exception if the result is not yet available
- :raises JobLookupError: if the job does not exist in the data store
+ :raises JobLookupError: if ``wait=False`` and the job result does not exist in
+ the data store
"""
wait_event = anyio.Event()
@@ -253,9 +259,7 @@ class AsyncScheduler:
await wait_event.wait()
- result = await self.data_store.get_job_result(job_id)
- assert isinstance(result, JobResult)
- return result
+ return await self.data_store.get_job_result(job_id)
async def run_job(
self,
@@ -287,7 +291,11 @@ class AsyncScheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
job_id = await self.add_job(
- func_or_task_id, args=args, kwargs=kwargs, tags=tags
+ func_or_task_id,
+ args=args,
+ kwargs=kwargs,
+ tags=tags,
+ result_expiration_time=timedelta(minutes=15),
)
await job_complete_event.wait()
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index aea7047..7ed6d3f 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -234,6 +234,7 @@ class Scheduler:
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
tags: Iterable[str] | None = None,
+ result_expiration_time: timedelta | float = 0,
) -> UUID:
"""
Add a job to the data store.
@@ -243,6 +244,9 @@ class Scheduler:
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
:param tags: strings that can be used to categorize and filter the job
+ :param result_expiration_time: the minimum time (as seconds, or timedelta) to
+ keep the result of the job available for fetching (the result won't be
+ saved at all if that time is 0)
:return: the ID of the newly created job
"""
@@ -258,6 +262,7 @@ class Scheduler:
args=args or (),
kwargs=kwargs or {},
tags=tags or frozenset(),
+ result_expiration_time=result_expiration_time,
)
self.data_store.add_job(job)
return job.id
@@ -269,7 +274,8 @@ class Scheduler:
:param job_id: the ID of the job
:param wait: if ``True``, wait until the job has ended (one way or another),
``False`` to raise an exception if the result is not yet available
- :raises JobLookupError: if the job does not exist in the data store
+ :raises JobLookupError: if ``wait=False`` and the job result does not exist in
+ the data store
"""
self._ensure_services_ready()
@@ -288,9 +294,7 @@ class Scheduler:
wait_event.wait()
- result = self.data_store.get_job_result(job_id)
- assert isinstance(result, JobResult)
- return result
+ return self.data_store.get_job_result(job_id)
def run_job(
self,
@@ -322,7 +326,13 @@ class Scheduler:
job_id: UUID | None = None
with self.data_store.events.subscribe(listener, {JobReleased}):
- job_id = self.add_job(func_or_task_id, args=args, kwargs=kwargs, tags=tags)
+ job_id = self.add_job(
+ func_or_task_id,
+ args=args,
+ kwargs=kwargs,
+ tags=tags,
+ result_expiration_time=timedelta(minutes=15),
+ )
job_complete_event.wait()
result = self.get_job_result(job_id)
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index a44c89c..b04eea1 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -23,7 +23,7 @@ from anyio.abc import CancelScope, TaskGroup
from .._context import current_job, current_worker
from .._converters import as_async_datastore, as_async_eventbroker
from .._enums import JobOutcome, RunState
-from .._events import JobAdded, WorkerStarted, WorkerStopped
+from .._events import JobAdded, JobReleased, WorkerStarted, WorkerStopped
from .._structures import Job, JobInfo, JobResult
from .._validators import positive_integer
from ..abc import AsyncDataStore, AsyncEventBroker
@@ -172,10 +172,17 @@ class AsyncWorker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.missed_start_deadline
+ result = JobResult.from_job(
+ job,
+ outcome=JobOutcome.missed_start_deadline,
+ finished_at=start_time,
)
await self.data_store.release_job(self.identity, job.task_id, result)
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
+ )
return
token = current_job.set(JobInfo.from_job(job))
@@ -184,23 +191,60 @@ class AsyncWorker:
if isawaitable(retval):
retval = await retval
except get_cancelled_exc_class():
+ self.logger.info("Job %s was cancelled", job.id)
with CancelScope(shield=True):
- result = JobResult(job_id=job.id, outcome=JobOutcome.cancelled)
+ result = JobResult.from_job(
+ job,
+ outcome=JobOutcome.cancelled,
+ )
await self.data_store.release_job(
self.identity, job.task_id, result
)
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id,
+ worker_id=self.identity,
+ outcome=result.outcome,
+ )
+ )
except BaseException as exc:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.error, exception=exc
+ if isinstance(exc, Exception):
+ self.logger.exception("Job %s raised an exception", job.id)
+ else:
+ self.logger.error(
+ "Job %s was aborted due to %s", job.id, exc.__class__.__name__
+ )
+
+ result = JobResult.from_job(
+ job,
+ JobOutcome.error,
+ exception=exc,
+ )
+ await self.data_store.release_job(
+ self.identity,
+ job.task_id,
+ result,
+ )
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
)
- await self.data_store.release_job(self.identity, job.task_id, result)
if not isinstance(exc, Exception):
raise
else:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.success, return_value=retval
+ self.logger.info("Job %s completed successfully", job.id)
+ result = JobResult.from_job(
+ job,
+ JobOutcome.success,
+ return_value=retval,
)
await self.data_store.release_job(self.identity, job.task_id, result)
+ await self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
+ )
finally:
current_job.reset(token)
finally:
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 1d8f85a..8c627c6 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -15,6 +15,7 @@ from uuid import UUID
import attrs
+from .. import JobReleased
from .._context import current_job, current_worker
from .._enums import JobOutcome, RunState
from .._events import JobAdded, WorkerStarted, WorkerStopped
@@ -200,8 +201,13 @@ class Worker:
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.missed_start_deadline
+ result = JobResult.from_job(
+ job, JobOutcome.missed_start_deadline, finished_at=start_time
+ )
+ self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
)
self.data_store.release_job(self.identity, job.task_id, result)
return
@@ -210,17 +216,43 @@ class Worker:
try:
retval = func(*job.args, **job.kwargs)
except BaseException as exc:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.error, exception=exc
+ if isinstance(exc, Exception):
+ self.logger.exception("Job %s raised an exception", job.id)
+ else:
+ self.logger.error(
+ "Job %s was aborted due to %s", job.id, exc.__class__.__name__
+ )
+
+ result = JobResult.from_job(
+ job,
+ JobOutcome.error,
+ exception=exc,
+ )
+ self.data_store.release_job(
+ self.identity,
+ job.task_id,
+ result,
+ )
+ self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
)
- self.data_store.release_job(self.identity, job.task_id, result)
if not isinstance(exc, Exception):
raise
else:
- result = JobResult(
- job_id=job.id, outcome=JobOutcome.success, return_value=retval
+ self.logger.info("Job %s completed successfully", job.id)
+ result = JobResult.from_job(
+ job,
+ JobOutcome.success,
+ return_value=retval,
)
self.data_store.release_job(self.identity, job.task_id, result)
+ self.event_broker.publish(
+ JobReleased(
+ job_id=job.id, worker_id=self.identity, outcome=result.outcome
+ )
+ )
finally:
current_job.reset(token)
finally:
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 22722c7..40fad27 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import threading
from collections.abc import Generator
from contextlib import asynccontextmanager, contextmanager
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from tempfile import TemporaryDirectory
from typing import Any, AsyncGenerator, cast
@@ -433,7 +433,7 @@ class TestDataStores:
def test_job_release_success(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -443,9 +443,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -460,7 +460,7 @@ class TestDataStores:
def test_job_release_failure(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -470,9 +470,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -488,7 +488,7 @@ class TestDataStores:
def test_job_release_missed_deadline(self, datastore: DataStore):
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -498,7 +498,10 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -511,7 +514,7 @@ class TestDataStores:
def test_job_release_cancelled(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker1", 2)
@@ -521,7 +524,10 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.cancelled,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -576,9 +582,9 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
@@ -890,7 +896,7 @@ class TestAsyncDataStores:
async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -900,9 +906,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -917,7 +923,7 @@ class TestAsyncDataStores:
async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -927,9 +933,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -945,7 +951,7 @@ class TestAsyncDataStores:
async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -955,7 +961,10 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -968,7 +977,7 @@ class TestAsyncDataStores:
async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker1", 2)
@@ -978,7 +987,7 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(acquired[0], JobOutcome.cancelled),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -998,7 +1007,7 @@ class TestAsyncDataStores:
"""
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
# First, one worker acquires the first available job
@@ -1035,9 +1044,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 32f28dc..f57cbeb 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -17,6 +17,7 @@ from apscheduler import (
JobAdded,
JobLookupError,
JobOutcome,
+ JobReleased,
Schedule,
ScheduleAdded,
ScheduleLookupError,
@@ -181,16 +182,33 @@ class TestAsyncScheduler:
async def test_get_job_result_success(self) -> None:
async with AsyncScheduler() as scheduler:
- job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
+ job_id = await scheduler.add_job(
+ dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5
+ )
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ async def test_get_job_result_success_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ )
+ job_id = await scheduler.add_job(dummy_async_job)
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_error(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(
- dummy_async_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_async_job,
+ kwargs={"delay": 0.2, "fail": True},
+ result_expiration_time=5,
)
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -198,6 +216,17 @@ class TestAsyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ async def test_get_job_result_error_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True)
+ job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_nowait_not_yet_ready(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
@@ -239,6 +268,7 @@ class TestAsyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
await scheduler.data_store.add_job(job)
result = await scheduler.get_job_result(job.id)
@@ -371,17 +401,29 @@ class TestSyncScheduler:
)
assert jobs[0].original_scheduled_time == orig_start_time
- def test_get_job_result(self) -> None:
+ def test_get_job_result_success(self) -> None:
with Scheduler() as scheduler:
- job_id = scheduler.add_job(dummy_sync_job)
+ job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5)
result = scheduler.get_job_result(job_id)
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ def test_get_job_result_success_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler:
+ with scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job)
+ event.wait(3)
+
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_error(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(
- dummy_sync_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5
)
result = scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -389,6 +431,16 @@ class TestSyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ def test_get_job_result_error_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler, scheduler.event_broker.subscribe(
+ lambda evt: event.set(), one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ event.wait(3)
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_nowait_not_yet_ready(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2})
@@ -428,6 +480,7 @@ class TestSyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
scheduler.data_store.add_job(job)
result = scheduler.get_job_result(job.id)