diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_datastores.py | 73 | ||||
-rw-r--r-- | tests/test_schedulers.py | 63 |
2 files changed, 99 insertions, 37 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 22722c7..40fad27 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -3,7 +3,7 @@ from __future__ import annotations import threading from collections.abc import Generator from contextlib import asynccontextmanager, contextmanager -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from tempfile import TemporaryDirectory from typing import Any, AsyncGenerator, cast @@ -433,7 +433,7 @@ class TestDataStores: def test_job_release_success(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -443,9 +443,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -460,7 +460,7 @@ class TestDataStores: def test_job_release_failure(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -470,9 +470,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -488,7 +488,7 @@ class TestDataStores: def test_job_release_missed_deadline(self, datastore: DataStore): datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -498,7 +498,10 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -511,7 +514,7 @@ class TestDataStores: def test_job_release_cancelled(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker1", 2) @@ -521,7 +524,10 @@ class TestDataStores: datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job( + acquired[0], + JobOutcome.cancelled, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -576,9 +582,9 @@ class TestDataStores: datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) @@ -890,7 +896,7 @@ class TestAsyncDataStores: async def test_job_release_success(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -900,9 +906,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -917,7 +923,7 @@ class TestAsyncDataStores: async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -927,9 +933,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -945,7 +951,7 @@ class TestAsyncDataStores: async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -955,7 +961,10 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -968,7 +977,7 @@ class TestAsyncDataStores: async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker1", 2) @@ -978,7 +987,7 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job(acquired[0], JobOutcome.cancelled), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -998,7 +1007,7 @@ class TestAsyncDataStores: """ await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) # First, one worker acquires the first available job @@ -1035,9 +1044,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 32f28dc..f57cbeb 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -17,6 +17,7 @@ from apscheduler import ( JobAdded, JobLookupError, JobOutcome, + JobReleased, Schedule, ScheduleAdded, ScheduleLookupError, @@ -181,16 +182,33 @@ class TestAsyncScheduler: async def test_get_job_result_success(self) -> None: async with AsyncScheduler() as scheduler: - job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2}) + job_id = await scheduler.add_job( + dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5 + ) result = await scheduler.get_job_result(job_id) assert result.job_id == job_id assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" + async def test_get_job_result_success_empty(self) -> None: + event = anyio.Event() + async with AsyncScheduler() as scheduler: + scheduler.event_broker.subscribe( + lambda evt: event.set(), {JobReleased}, one_shot=True + ) + job_id = await scheduler.add_job(dummy_async_job) + with fail_after(3): + await event.wait() + + with pytest.raises(JobLookupError): + await scheduler.get_job_result(job_id, wait=False) + async def test_get_job_result_error(self) -> None: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job( - dummy_async_job, kwargs={"delay": 0.2, "fail": True} + dummy_async_job, + kwargs={"delay": 0.2, "fail": True}, + result_expiration_time=5, ) result = await scheduler.get_job_result(job_id) assert result.job_id == job_id @@ -198,6 +216,17 @@ class TestAsyncScheduler: assert isinstance(result.exception, RuntimeError) assert str(result.exception) == "failing as requested" + async def test_get_job_result_error_empty(self) -> None: + event = anyio.Event() + async with AsyncScheduler() as scheduler: + scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True) + job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + with fail_after(3): + await event.wait() + + with pytest.raises(JobLookupError): + await scheduler.get_job_result(job_id, wait=False) + async def test_get_job_result_nowait_not_yet_ready(self) -> None: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2}) @@ -239,6 +268,7 @@ class TestAsyncScheduler: jitter=timedelta(seconds=2.16), start_deadline=start_deadline, tags={"foo", "bar"}, + result_expiration_time=timedelta(seconds=10), ) await scheduler.data_store.add_job(job) result = await scheduler.get_job_result(job.id) @@ -371,17 +401,29 @@ class TestSyncScheduler: ) assert jobs[0].original_scheduled_time == orig_start_time - def test_get_job_result(self) -> None: + def test_get_job_result_success(self) -> None: with Scheduler() as scheduler: - job_id = scheduler.add_job(dummy_sync_job) + job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5) result = scheduler.get_job_result(job_id) assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" + def test_get_job_result_success_empty(self) -> None: + event = threading.Event() + with Scheduler() as scheduler: + with scheduler.event_broker.subscribe( + lambda evt: event.set(), {JobReleased}, one_shot=True + ): + job_id = scheduler.add_job(dummy_sync_job) + event.wait(3) + + with pytest.raises(JobLookupError): + scheduler.get_job_result(job_id, wait=False) + def test_get_job_result_error(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job( - dummy_sync_job, kwargs={"delay": 0.2, "fail": True} + dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5 ) result = scheduler.get_job_result(job_id) assert result.job_id == job_id @@ -389,6 +431,16 @@ class TestSyncScheduler: assert isinstance(result.exception, RuntimeError) assert str(result.exception) == "failing as requested" + def test_get_job_result_error_empty(self) -> None: + event = threading.Event() + with Scheduler() as scheduler, scheduler.event_broker.subscribe( + lambda evt: event.set(), one_shot=True + ): + job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + event.wait(3) + with pytest.raises(JobLookupError): + scheduler.get_job_result(job_id, wait=False) + def test_get_job_result_nowait_not_yet_ready(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2}) @@ -428,6 +480,7 @@ class TestSyncScheduler: jitter=timedelta(seconds=2.16), start_deadline=start_deadline, tags={"foo", "bar"}, + result_expiration_time=timedelta(seconds=10), ) scheduler.data_store.add_job(job) result = scheduler.get_job_result(job.id) |