summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-08-31 09:04:57 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-06 01:39:07 +0300
commitdbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad (patch)
tree848514069cb09aef702ef320a2697dcb1dd3897e /tests
parentff9fbf1bd54501f000a4d2043bdd673ba6bb5aa5 (diff)
downloadapscheduler-dbe0f8bdd58cef5bd9060ed4c6f9eb5651ce09ad.tar.gz
Added preliminary support for job result reporting and job cancellation
Diffstat (limited to 'tests')
-rw-r--r--tests/test_datastores.py100
-rw-r--r--tests/test_workers.py14
2 files changed, 93 insertions, 21 deletions
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index e0a9d26..ef86429 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -9,8 +9,10 @@ import pytest
from freezegun.api import FrozenDateTimeFactory
from apscheduler.abc import AsyncDataStore, Job, Schedule
-from apscheduler.events import Event, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated
+from apscheduler.enums import JobOutcome
+from apscheduler.events import Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated
from apscheduler.policies import CoalescePolicy, ConflictPolicy
+from apscheduler.structures import JobResult
from apscheduler.triggers.date import DateTrigger
@@ -196,9 +198,9 @@ class TestAsyncStores:
# assert len(acquired3) == 1
# assert acquired3[0].id == 's1'
- async def test_acquire_release_jobs(self, datastore_cm: AsyncContextManager[AsyncDataStore],
- jobs: List[Job]) -> None:
- async with datastore_cm as store, capture_events(store, 0) as events:
+ async def test_acquire_release_multiple_workers(
+ self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]) -> None:
+ async with datastore_cm as store:
for job in jobs:
await store.add_job(job)
@@ -213,21 +215,89 @@ class TestAsyncStores:
assert jobs2[0].id == jobs[1].id
# The third worker gets nothing
- assert not await store.acquire_jobs('dummy-id3', 1)
+ jobs3 = await store.acquire_jobs('dummy-id3', 1)
+ assert not jobs3
+
+ async def test_job_release_success(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ jobs: List[Job]):
+ async with datastore_cm as store:
+ await store.add_job(jobs[0])
+
+ acquired = await store.acquire_jobs('worker_id', 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == jobs[0].id
+
+ await store.release_job('worker_id', acquired[0].id,
+ JobResult(JobOutcome.success, return_value='foo'))
+ result = await store.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.success
+ assert result.exception is None
+ assert result.return_value == 'foo'
+
+ # Check that the job and its result are gone
+ assert not await store.get_jobs({acquired[0].id})
+ assert not await store.get_job_result(acquired[0].id)
+
+ async def test_job_release_failure(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ jobs: List[Job]):
+ async with datastore_cm as store:
+ await store.add_job(jobs[0])
+
+ acquired = await store.acquire_jobs('worker_id', 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == jobs[0].id
- # All the jobs should still be returned
- visible_jobs = await store.get_jobs()
- assert len(visible_jobs) == 2
+ await store.release_job('worker_id', acquired[0].id,
+ JobResult(JobOutcome.failure, exception=ValueError('foo')))
+ result = await store.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.failure
+ assert isinstance(result.exception, ValueError)
+ assert result.exception.args == ('foo',)
+ assert result.return_value is None
- await store.release_jobs('dummy-id1', jobs1)
- await store.release_jobs('dummy-id2', jobs2)
+ # Check that the job and its result are gone
+ assert not await store.get_jobs({acquired[0].id})
+ assert not await store.get_job_result(acquired[0].id)
+
+ async def test_job_release_missed_deadline(
+ self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]):
+ async with datastore_cm as store:
+ await store.add_job(jobs[0])
+
+ acquired = await store.acquire_jobs('worker_id', 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == jobs[0].id
+
+ await store.release_job('worker_id', acquired[0].id,
+ JobResult(JobOutcome.missed_start_deadline))
+ result = await store.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.missed_start_deadline
+ assert result.exception is None
+ assert result.return_value is None
+
+ # Check that the job and its result are gone
+ assert not await store.get_jobs({acquired[0].id})
+ assert not await store.get_job_result(acquired[0].id)
+
+ async def test_job_release_cancelled(
+ self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job]):
+ async with datastore_cm as store:
+ await store.add_job(jobs[0])
+
+ acquired = await store.acquire_jobs('worker_id', 2)
+ assert len(acquired) == 1
+ assert acquired[0].id == jobs[0].id
- # All the jobs should be gone
- visible_jobs = await store.get_jobs()
- assert len(visible_jobs) == 0
+ await store.release_job('worker_id', acquired[0].id,
+ JobResult(JobOutcome.cancelled))
+ result = await store.get_job_result(acquired[0].id)
+ assert result.outcome is JobOutcome.cancelled
+ assert result.exception is None
+ assert result.return_value is None
- # Check for the appropriate events
- assert all(isinstance(event, JobAdded) for event in events)
+ # Check that the job and its result are gone
+ assert not await store.get_jobs({acquired[0].id})
+ assert not await store.get_job_result(acquired[0].id)
async def test_acquire_jobs_lock_timeout(
self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job],
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 18f9fb7..0ba1b17 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -77,12 +77,13 @@ class TestAsyncWorker:
if fail:
# Then the job failed
assert isinstance(received_event, JobFailed)
- assert isinstance(received_event.exception, str)
- assert isinstance(received_event.traceback, str)
+ assert isinstance(received_event.exc_type, str)
+ assert isinstance(received_event.exc_val, str)
+ assert isinstance(received_event.exc_tb, str)
else:
# Then the job finished successfully
assert isinstance(received_event, JobCompleted)
- assert received_event.return_value == ((1, 2), {'x': 'foo'})
+ assert received_event.return_value == "((1, 2), {'x': 'foo'})"
# Finally, the worker was stopped
received_event = received_events.pop(0)
@@ -177,12 +178,13 @@ class TestSyncWorker:
if fail:
# Then the job failed
assert isinstance(received_event, JobFailed)
- assert isinstance(received_event.exception, str)
- assert isinstance(received_event.traceback, str)
+ assert isinstance(received_event.exc_type, str)
+ assert isinstance(received_event.exc_val, str)
+ assert isinstance(received_event.exc_tb, str)
else:
# Then the job finished successfully
assert isinstance(received_event, JobCompleted)
- assert received_event.return_value == ((1, 2), {'x': 'foo'})
+ assert received_event.return_value == "((1, 2), {'x': 'foo'})"
# Finally, the worker was stopped
received_event = received_events.pop(0)