diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-08-13 23:06:08 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2022-08-13 23:06:08 +0300 |
commit | a9141bb5663e0a22cc7c4da7d34834c925dbadea (patch) | |
tree | 533ee4126b22d0b30b6c74106d7b4d3b89328757 /tests | |
parent | ffbbbbe0ee147bba08e0c95b3697136590997fb1 (diff) | |
download | apscheduler-a9141bb5663e0a22cc7c4da7d34834c925dbadea.tar.gz |
Added job expiration times
Scheduled jobs no longer retain their results. All job outcomes are now logged by the workers.
Workers, rather than data stores, are now responsible for emitting the JobReleased event.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_datastores.py | 73 | ||||
-rw-r--r-- | tests/test_schedulers.py | 63 |
2 files changed, 99 insertions, 37 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 22722c7..40fad27 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -3,7 +3,7 @@ from __future__ import annotations import threading from collections.abc import Generator from contextlib import asynccontextmanager, contextmanager -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from tempfile import TemporaryDirectory from typing import Any, AsyncGenerator, cast @@ -433,7 +433,7 @@ class TestDataStores: def test_job_release_success(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -443,9 +443,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -460,7 +460,7 @@ class TestDataStores: def test_job_release_failure(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -470,9 +470,9 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -488,7 +488,7 @@ class TestDataStores: def test_job_release_missed_deadline(self, datastore: DataStore): datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker_id", 2) @@ -498,7 +498,10 @@ class TestDataStores: datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -511,7 +514,7 @@ class TestDataStores: def test_job_release_cancelled(self, datastore: DataStore) -> None: datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) datastore.add_job(job) acquired = datastore.acquire_jobs("worker1", 2) @@ -521,7 +524,10 @@ class TestDataStores: datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job( + acquired[0], + JobOutcome.cancelled, + ), ) result = datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -576,9 +582,9 @@ class TestDataStores: datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) @@ -890,7 +896,7 @@ class TestAsyncDataStores: async def test_job_release_success(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -900,9 +906,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired[0], + JobOutcome.success, return_value="foo", ), ) @@ -917,7 +923,7 @@ class TestAsyncDataStores: async def test_job_release_failure(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -927,9 +933,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult( - job_id=acquired[0].id, - outcome=JobOutcome.error, + JobResult.from_job( + acquired[0], + JobOutcome.error, exception=ValueError("foo"), ), ) @@ -945,7 +951,7 @@ class TestAsyncDataStores: async def test_job_release_missed_deadline(self, datastore: AsyncDataStore): await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker_id", 2) @@ -955,7 +961,10 @@ class TestAsyncDataStores: await datastore.release_job( "worker_id", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline), + JobResult.from_job( + acquired[0], + JobOutcome.missed_start_deadline, + ), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.missed_start_deadline @@ -968,7 +977,7 @@ class TestAsyncDataStores: async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None: await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) acquired = await datastore.acquire_jobs("worker1", 2) @@ -978,7 +987,7 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled), + JobResult.from_job(acquired[0], JobOutcome.cancelled), ) result = await datastore.get_job_result(acquired[0].id) assert result.outcome is JobOutcome.cancelled @@ -998,7 +1007,7 @@ class TestAsyncDataStores: """ await datastore.add_task(Task(id="task1", func=asynccontextmanager)) - job = Job(task_id="task1") + job = Job(task_id="task1", result_expiration_time=timedelta(minutes=1)) await datastore.add_job(job) # First, one worker acquires the first available job @@ -1035,9 +1044,9 @@ class TestAsyncDataStores: await datastore.release_job( "worker1", acquired_jobs[0].task_id, - JobResult( - job_id=acquired_jobs[0].id, - outcome=JobOutcome.success, + JobResult.from_job( + acquired_jobs[0], + JobOutcome.success, return_value=None, ), ) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 32f28dc..f57cbeb 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -17,6 +17,7 @@ from apscheduler import ( JobAdded, JobLookupError, JobOutcome, + JobReleased, Schedule, ScheduleAdded, ScheduleLookupError, @@ -181,16 +182,33 @@ class TestAsyncScheduler: async def test_get_job_result_success(self) -> None: async with AsyncScheduler() as scheduler: - job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2}) + job_id = await scheduler.add_job( + dummy_async_job, kwargs={"delay": 0.2}, result_expiration_time=5 + ) result = await scheduler.get_job_result(job_id) assert result.job_id == job_id assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" + async def test_get_job_result_success_empty(self) -> None: + event = anyio.Event() + async with AsyncScheduler() as scheduler: + scheduler.event_broker.subscribe( + lambda evt: event.set(), {JobReleased}, one_shot=True + ) + job_id = await scheduler.add_job(dummy_async_job) + with fail_after(3): + await event.wait() + + with pytest.raises(JobLookupError): + await scheduler.get_job_result(job_id, wait=False) + async def test_get_job_result_error(self) -> None: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job( - dummy_async_job, kwargs={"delay": 0.2, "fail": True} + dummy_async_job, + kwargs={"delay": 0.2, "fail": True}, + result_expiration_time=5, ) result = await scheduler.get_job_result(job_id) assert result.job_id == job_id @@ -198,6 +216,17 @@ class TestAsyncScheduler: assert isinstance(result.exception, RuntimeError) assert str(result.exception) == "failing as requested" + async def test_get_job_result_error_empty(self) -> None: + event = anyio.Event() + async with AsyncScheduler() as scheduler: + scheduler.event_broker.subscribe(lambda evt: event.set(), one_shot=True) + job_id = await scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + with fail_after(3): + await event.wait() + + with pytest.raises(JobLookupError): + await scheduler.get_job_result(job_id, wait=False) + async def test_get_job_result_nowait_not_yet_ready(self) -> None: async with AsyncScheduler() as scheduler: job_id = await scheduler.add_job(dummy_async_job, kwargs={"delay": 0.2}) @@ -239,6 +268,7 @@ class TestAsyncScheduler: jitter=timedelta(seconds=2.16), start_deadline=start_deadline, tags={"foo", "bar"}, + result_expiration_time=timedelta(seconds=10), ) await scheduler.data_store.add_job(job) result = await scheduler.get_job_result(job.id) @@ -371,17 +401,29 @@ class TestSyncScheduler: ) assert jobs[0].original_scheduled_time == orig_start_time - def test_get_job_result(self) -> None: + def test_get_job_result_success(self) -> None: with Scheduler() as scheduler: - job_id = scheduler.add_job(dummy_sync_job) + job_id = scheduler.add_job(dummy_sync_job, result_expiration_time=5) result = scheduler.get_job_result(job_id) assert result.outcome is JobOutcome.success assert result.return_value == "returnvalue" + def test_get_job_result_success_empty(self) -> None: + event = threading.Event() + with Scheduler() as scheduler: + with scheduler.event_broker.subscribe( + lambda evt: event.set(), {JobReleased}, one_shot=True + ): + job_id = scheduler.add_job(dummy_sync_job) + event.wait(3) + + with pytest.raises(JobLookupError): + scheduler.get_job_result(job_id, wait=False) + def test_get_job_result_error(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job( - dummy_sync_job, kwargs={"delay": 0.2, "fail": True} + dummy_sync_job, kwargs={"fail": True}, result_expiration_time=5 ) result = scheduler.get_job_result(job_id) assert result.job_id == job_id @@ -389,6 +431,16 @@ class TestSyncScheduler: assert isinstance(result.exception, RuntimeError) assert str(result.exception) == "failing as requested" + def test_get_job_result_error_empty(self) -> None: + event = threading.Event() + with Scheduler() as scheduler, scheduler.event_broker.subscribe( + lambda evt: event.set(), one_shot=True + ): + job_id = scheduler.add_job(dummy_sync_job, kwargs={"fail": True}) + event.wait(3) + with pytest.raises(JobLookupError): + scheduler.get_job_result(job_id, wait=False) + def test_get_job_result_nowait_not_yet_ready(self) -> None: with Scheduler() as scheduler: job_id = scheduler.add_job(dummy_sync_job, kwargs={"delay": 0.2}) @@ -428,6 +480,7 @@ class TestSyncScheduler: jitter=timedelta(seconds=2.16), start_deadline=start_deadline, tags={"foo", "bar"}, + result_expiration_time=timedelta(seconds=10), ) scheduler.data_store.add_job(job) result = scheduler.get_job_result(job.id) |