summaryrefslogtreecommitdiff
path: root/tests/test_datastores.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_datastores.py')
-rw-r--r--tests/test_datastores.py73
1 files changed, 41 insertions, 32 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 22722c7..40fad27 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -3,7 +3,7 @@ from __future__ import annotations
import threading
from collections.abc import Generator
from contextlib import asynccontextmanager, contextmanager
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from tempfile import TemporaryDirectory
from typing import Any, AsyncGenerator, cast
@@ -433,7 +433,7 @@ class TestDataStores:
def test_job_release_success(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -443,9 +443,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -460,7 +460,7 @@ class TestDataStores:
def test_job_release_failure(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -470,9 +470,9 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -488,7 +488,7 @@ class TestDataStores:
def test_job_release_missed_deadline(self, datastore: DataStore):
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker_id", 2)
@@ -498,7 +498,10 @@ class TestDataStores:
datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -511,7 +514,7 @@ class TestDataStores:
def test_job_release_cancelled(self, datastore: DataStore) -> None:
datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
datastore.add_job(job)
acquired = datastore.acquire_jobs("worker1", 2)
@@ -521,7 +524,10 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.cancelled,
+ ),
)
result = datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -576,9 +582,9 @@ class TestDataStores:
datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)
@@ -890,7 +896,7 @@ class TestAsyncDataStores:
async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -900,9 +906,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.success,
return_value="foo",
),
)
@@ -917,7 +923,7 @@ class TestAsyncDataStores:
async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -927,9 +933,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(
- job_id=acquired[0].id,
- outcome=JobOutcome.error,
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.error,
exception=ValueError("foo"),
),
)
@@ -945,7 +951,7 @@ class TestAsyncDataStores:
async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker_id", 2)
@@ -955,7 +961,10 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker_id",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline),
+ JobResult.from_job(
+ acquired[0],
+ JobOutcome.missed_start_deadline,
+ ),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
@@ -968,7 +977,7 @@ class TestAsyncDataStores:
async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
acquired = await datastore.acquire_jobs("worker1", 2)
@@ -978,7 +987,7 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled),
+ JobResult.from_job(acquired[0], JobOutcome.cancelled),
)
result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
@@ -998,7 +1007,7 @@ class TestAsyncDataStores:
"""
await datastore.add_task(Task(id="task1", func=asynccontextmanager))
- job = Job(task_id="task1")
+ job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1))
await datastore.add_job(job)
# First, one worker acquires the first available job
@@ -1035,9 +1044,9 @@ class TestAsyncDataStores:
await datastore.release_job(
"worker1",
acquired_jobs[0].task_id,
- JobResult(
- job_id=acquired_jobs[0].id,
- outcome=JobOutcome.success,
+ JobResult.from_job(
+ acquired_jobs[0],
+ JobOutcome.success,
return_value=None,
),
)