diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-08-31 09:04:57 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 01:39:07 +0300 |
commit | dbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad (patch) | |
tree | 848514069cb09aef702ef320a2697dcb1dd3897e /tests | |
parent | ff9fbf1bd54501f000a4d2043bdd673ba6bb5aa5 (diff) | |
download | apscheduler-dbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad.tar.gz |
Added preliminary support for job result reporting and job cancellation
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_datastores.py | 100 | ||||
-rw-r--r-- | tests/test_workers.py | 14 |
2 files changed, 93 insertions, 21 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py index e0a9d26..ef86429 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -9,8 +9,10 @@ import pytest from freezegun.api import FrozenDateTimeFactory from apscheduler.abc import AsyncDataStore, Job, Schedule -from apscheduler.events import Event, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated +from apscheduler.enums import JobOutcome +from apscheduler.events import Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated from apscheduler.policies import CoalescePolicy, ConflictPolicy +from apscheduler.structures import JobResult from apscheduler.triggers.date import DateTrigger @@ -196,9 +198,9 @@ class TestAsyncStores: # assert len(acquired3) == 1 # assert acquired3[0].id == 's1' - async def test_acquire_release_jobs(self, datastore_cm: AsyncContextManager[AsyncDataStore], - jobs: List[Job]) -> None: - async with datastore_cm as store, capture_events(store, 0) as events: + async def test_acquire_release_multiple_workers( + self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]) -> None: + async with datastore_cm as store: for job in jobs: await store.add_job(job) @@ -213,21 +215,89 @@ class TestAsyncStores: assert jobs2[0].id == jobs[1].id # The third worker gets nothing - assert not await store.acquire_jobs('dummy-id3', 1) + jobs3 = await store.acquire_jobs('dummy-id3', 1) + assert not jobs3 + + async def test_job_release_success(self, datastore_cm: AsyncContextManager[AsyncDataStore], + jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id + + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.success, return_value='foo')) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.success + assert result.exception is None + assert result.return_value == 'foo' + + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) + + async def test_job_release_failure(self, datastore_cm: AsyncContextManager[AsyncDataStore], + jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id - # All the jobs should still be returned - visible_jobs = await store.get_jobs() - assert len(visible_jobs) == 2 + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.failure, exception=ValueError('foo'))) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.failure + assert isinstance(result.exception, ValueError) + assert result.exception.args == ('foo',) + assert result.return_value is None - await store.release_jobs('dummy-id1', jobs1) - await store.release_jobs('dummy-id2', jobs2) + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) + + async def test_job_release_missed_deadline( + self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id + + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.missed_start_deadline)) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.missed_start_deadline + assert result.exception is None + assert result.return_value is None + + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) + + async def test_job_release_cancelled( + self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]): + async with datastore_cm as store: + await store.add_job(jobs[0]) + + acquired = await store.acquire_jobs('worker_id', 2) + assert len(acquired) == 1 + assert acquired[0].id == jobs[0].id - # All the jobs should be gone - visible_jobs = await store.get_jobs() - assert len(visible_jobs) == 0 + await store.release_job('worker_id', acquired[0].id, + JobResult(JobOutcome.cancelled)) + result = await store.get_job_result(acquired[0].id) + assert result.outcome is JobOutcome.cancelled + assert result.exception is None + assert result.return_value is None - # Check for the appropriate events - assert all(isinstance(event, JobAdded) for event in events) + # Check that the job and its result are gone + assert not await store.get_jobs({acquired[0].id}) + assert not await store.get_job_result(acquired[0].id) async def test_acquire_jobs_lock_timeout( self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job], diff --git a/tests/test_workers.py b/tests/test_workers.py index 18f9fb7..0ba1b17 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -77,12 +77,13 @@ class TestAsyncWorker: if fail: # Then the job failed assert isinstance(received_event, JobFailed) - assert isinstance(received_event.exception, str) - assert isinstance(received_event.traceback, str) + assert isinstance(received_event.exc_type, str) + assert isinstance(received_event.exc_val, str) + assert isinstance(received_event.exc_tb, str) else: # Then the job finished successfully assert isinstance(received_event, JobCompleted) - assert received_event.return_value == ((1, 2), {'x': 'foo'}) + assert received_event.return_value == "((1, 2), {'x': 'foo'})" # Finally, the worker was stopped received_event = received_events.pop(0) @@ -177,12 +178,13 @@ class TestSyncWorker: if fail: # Then the job failed assert isinstance(received_event, JobFailed) - assert isinstance(received_event.exception, str) - assert isinstance(received_event.traceback, str) + assert isinstance(received_event.exc_type, str) + assert isinstance(received_event.exc_val, str) + assert isinstance(received_event.exc_tb, str) else: # Then the job finished successfully assert isinstance(received_event, JobCompleted) - assert received_event.return_value == ((1, 2), {'x': 'foo'}) + assert received_event.return_value == "((1, 2), {'x': 'foo'})" # Finally, the worker was stopped received_event = received_events.pop(0) |