summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_datastores.py73
-rw-r--r--tests/test_schedulers.py63
2 files changed, 99 insertions, 37 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 22722c7..40fad27 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import threading
from collections.abc import Generator
from contextlib import asynccontextmanager, contextmanager
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from tempfile import TemporaryDirectory
from typing import Any, AsyncGenerator, cast
@@ -433,7 +433,7 @@ class TestDataStores:
def test_job_release_success(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -443,9 +443,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -460,7 +460,7 @@ class TestDataStores:
def test_job_release_failure(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -470,9 +470,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -488,7 +488,7 @@ class TestDataStores:
def test_job_release_missed_deadline(self, datastore: DataStore):
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -498,7 +498,10 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -511,7 +514,7 @@ class TestDataStores:
def test_job_release_cancelled(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker1", 2)
@@ -521,7 +524,10 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.cancelled,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -576,9 +582,9 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
@@ -890,7 +896,7 @@ class TestAsyncDataStores:
async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -900,9 +906,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -917,7 +923,7 @@ class TestAsyncDataStores:
async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -927,9 +933,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -945,7 +951,7 @@ class TestAsyncDataStores:
async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -955,7 +961,10 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -968,7 +977,7 @@ class TestAsyncDataStores:
async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker1", 2)
@@ -978,7 +987,7 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(acquired[0], JobOutcome.cancelled),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -998,7 +1007,7 @@ class TestAsyncDataStores:
"""
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
# First, one worker acquires the first available job
@@ -1035,9 +1044,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 32f28dc..f57cbeb 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -17,6 +17,7 @@ from apscheduler import (
JobAdded,
JobLookupError,
JobOutcome,
+ JobReleased,
Schedule,
ScheduleAdded,
ScheduleLookupError,
@@ -181,16 +182,33 @@ class TestAsyncScheduler:
async def test_get_job_result_success(self) -> None:
async with AsyncScheduler() as scheduler:
- job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
+ job_id = await scheduler.add_job(
+ dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5
+ )
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ async def test_get_job_result_success_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ )
+ job_id = await scheduler.add_job(dummy_async_job)
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_error(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(
- dummy_async_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_async_job,
+ kwargs={"delay": 0.2, "fail": True},
+ result_expiration_time=5,
)
result = await scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -198,6 +216,17 @@ class TestAsyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ async def test_get_job_result_error_empty(self) -> None:
+ event = anyio.Event()
+ async with AsyncScheduler() as scheduler:
+ scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True)
+ job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ with fail_after(3):
+ await event.wait()
+
+ with pytest.raises(JobLookupError):
+ await scheduler.get_job_result(job_id, wait=False)
+
async def test_get_job_result_nowait_not_yet_ready(self) -> None:
async with AsyncScheduler() as scheduler:
job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2})
@@ -239,6 +268,7 @@ class TestAsyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
await scheduler.data_store.add_job(job)
result = await scheduler.get_job_result(job.id)
@@ -371,17 +401,29 @@ class TestSyncScheduler:
)
assert jobs[0].original_scheduled_time == orig_start_time
- def test_get_job_result(self) -> None:
+ def test_get_job_result_success(self) -> None:
with Scheduler() as scheduler:
- job_id = scheduler.add_job(dummy_sync_job)
+ job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5)
result = scheduler.get_job_result(job_id)
assert result.outcome is JobOutcome.success
assert result.return_value == "returnvalue"
+ def test_get_job_result_success_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler:
+ with scheduler.event_broker.subscribe(
+ lambda evt: event.set(), {JobReleased}, one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job)
+ event.wait(3)
+
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_error(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(
- dummy_sync_job, kwargs={"delay": 0.2, "fail": True}
+ dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5
)
result = scheduler.get_job_result(job_id)
assert result.job_id == job_id
@@ -389,6 +431,16 @@ class TestSyncScheduler:
assert isinstance(result.exception, RuntimeError)
assert str(result.exception) == "failing as requested"
+ def test_get_job_result_error_empty(self) -> None:
+ event = threading.Event()
+ with Scheduler() as scheduler, scheduler.event_broker.subscribe(
+ lambda evt: event.set(), one_shot=True
+ ):
+ job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True})
+ event.wait(3)
+ with pytest.raises(JobLookupError):
+ scheduler.get_job_result(job_id, wait=False)
+
def test_get_job_result_nowait_not_yet_ready(self) -> None:
with Scheduler() as scheduler:
job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2})
@@ -428,6 +480,7 @@ class TestSyncScheduler:
jitter=timedelta(seconds=2.16),
start_deadline=start_deadline,
tags={"foo", "bar"},
+ result_expiration_time=timedelta(seconds=10),
)
scheduler.data_store.add_job(job)
result = scheduler.get_job_result(job.id)