diff options
Diffstat (limited to 'tests/test_datastores.py')
-rw-r--r-- | tests/test_datastores.py | 73 |
1 files changed, 41 insertions, 32 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 22722c7..40fad27 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -3,7 +3,7 @@ from __future__ import annotations import threading from collections.abc import Generator from contextlib import asynccontextmanager, contextmanager -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from tempfile import TemporaryDirectory from typing import Any, AsyncGenerator, cast @@ -433,7 +433,7 @@ class TestDataStores: def test_job_release_success(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -443,9 +443,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -460,7 +460,7 @@ class TestDataStores: def test_job_release_failure(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -470,9 +470,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -488,7 +488,7 @@ class TestDataStores: def test_job_release_missed_deadline(self, datastore: DataStore): datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -498,7 +498,10 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -511,7 +514,7 @@ class TestDataStores: def test_job_release_cancelled(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker1", 2) @@ -521,7 +524,10 @@ class TestDataStores: datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job( + acquired[0], + JobOutcome.cancelled, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -576,9 +582,9 @@ class TestDataStores: datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) @@ -890,7 +896,7 @@ class TestAsyncDataStores: async def test_job_release_success(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -900,9 +906,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -917,7 +923,7 @@ class TestAsyncDataStores: async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -927,9 +933,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -945,7 +951,7 @@ class TestAsyncDataStores: async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -955,7 +961,10 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -968,7 +977,7 @@ class TestAsyncDataStores: async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker1", 2) @@ -978,7 +987,7 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job(acquired[0], JobOutcome.cancelled), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -998,7 +1007,7 @@ class TestAsyncDataStores: """ await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) # First, one worker acquires the first available job @@ -1035,9 +1044,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) |