summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-11 23:03:46 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-11 23:07:44 +0300
commit2a4eb36499f976e4da6b4ff18880b8292d067975 (patch)
treed881e549efc77a0e4aeeb5f4f10fea1edcbfc789
parent56afe91d5dc338db3440b2e9ecdea3e522dba30f (diff)
downloadapscheduler-2a4eb36499f976e4da6b4ff18880b8292d067975.tar.gz
Applied pytest-lazy-fixture to data stores too
-rw-r--r--.github/workflows/codeqa-test.yml2
-rw-r--r--pyproject.toml3
-rw-r--r--tests/conftest.py113
-rw-r--r--tests/test_datastores.py327
4 files changed, 222 insertions, 223 deletions
diff --git a/.github/workflows/codeqa-test.yml b/.github/workflows/codeqa-test.yml
index d7f8590..e0c62c4 100644
--- a/.github/workflows/codeqa-test.yml
+++ b/.github/workflows/codeqa-test.yml
@@ -67,4 +67,4 @@ jobs:
- name: Install the project and its dependencies
run: pip install -e .[test]
- name: Test with pytest
- run: pytest -m "not externaldb"
+ run: pytest -m "not external_service"
diff --git a/pyproject.toml b/pyproject.toml
index a887726..a958502 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -14,6 +14,9 @@ local_scheme = "dirty-tag"
addopts = "-rsx --tb=short"
testpaths = "tests"
filterwarnings = "always"
+markers = [
+ "external_service: marks tests as requiring some external service",
+]
[tool.coverage.run]
source = ["apscheduler"]
diff --git a/tests/conftest.py b/tests/conftest.py
index 135c18d..89c4510 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,13 +1,9 @@
import sys
-from contextlib import asynccontextmanager, contextmanager
-from tempfile import TemporaryDirectory
-from typing import AsyncContextManager, AsyncGenerator, ContextManager, Generator, Optional
+from typing import Optional
import pytest
-from apscheduler.abc import AsyncDataStore, DataStore, Serializer
-from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter
-from apscheduler.datastores.memory import MemoryDataStore
+from apscheduler.abc import Serializer
from apscheduler.serializers.cbor import CBORSerializer
from apscheduler.serializers.json import JSONSerializer
from apscheduler.serializers.pickle import PickleSerializer
@@ -35,108 +31,3 @@ def serializer(request) -> Optional[Serializer]:
@pytest.fixture
def anyio_backend() -> 'str':
return 'asyncio'
-
-
-@contextmanager
-def setup_memory_store() -> Generator[DataStore, None, None]:
- yield MemoryDataStore()
-
-
-@contextmanager
-def setup_mongodb_store() -> Generator[DataStore, None, None]:
- from pymongo import MongoClient
-
- from apscheduler.datastores.mongodb import MongoDBDataStore
-
- with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client:
- yield MongoDBDataStore(client, start_from_scratch=True)
-
-
-@contextmanager
-def setup_sqlite_store() -> Generator[DataStore, None, None]:
- from sqlalchemy.future import create_engine
-
- from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
-
- with TemporaryDirectory('sqlite_') as tempdir:
- engine = create_engine(f'sqlite:///{tempdir}/test.db')
- try:
- yield SQLAlchemyDataStore(engine)
- finally:
- engine.dispose()
-
-
-@contextmanager
-def setup_psycopg2_store() -> Generator[DataStore, None, None]:
- from sqlalchemy.future import create_engine
-
- from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
-
- engine = create_engine('postgresql+psycopg2://postgres:secret@localhost/testdb')
- try:
- yield SQLAlchemyDataStore(engine, start_from_scratch=True)
- finally:
- engine.dispose()
-
-
-@contextmanager
-def setup_mysql_store() -> Generator[DataStore, None, None]:
- from sqlalchemy.future import create_engine
-
- from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
-
- engine = create_engine('mysql+pymysql://root:secret@localhost/testdb')
- try:
- yield SQLAlchemyDataStore(engine, start_from_scratch=True)
- finally:
- engine.dispose()
-
-
-@asynccontextmanager
-async def setup_asyncpg_store() -> AsyncGenerator[AsyncDataStore, None]:
- from sqlalchemy.ext.asyncio import create_async_engine
-
- from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore
-
- engine = create_async_engine('postgresql+asyncpg://postgres:secret@localhost/testdb',
- future=True)
- try:
- yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True)
- finally:
- await engine.dispose()
-
-
-@pytest.fixture(params=[
- pytest.param(setup_memory_store, id='memory'),
- pytest.param(setup_sqlite_store, id='sqlite'),
- pytest.param(setup_mongodb_store, id='mongodb', marks=[pytest.mark.externaldb]),
- pytest.param(setup_psycopg2_store, id='psycopg2', marks=[pytest.mark.externaldb]),
- pytest.param(setup_mysql_store, id='mysql', marks=[pytest.mark.externaldb])
-])
-def setup_sync_store(request) -> ContextManager[DataStore]:
- return request.param
-
-
-@pytest.fixture(params=[
- pytest.param(setup_asyncpg_store, id='asyncpg', marks=[pytest.mark.externaldb])
-])
-def setup_async_store(request) -> AsyncContextManager[AsyncDataStore]:
- return request.param
-
-
-@pytest.fixture(params=[
- pytest.param(setup_memory_store, id='memory'),
- pytest.param(setup_sqlite_store, id='sqlite'),
- pytest.param(setup_mongodb_store, id='mongodb', marks=[pytest.mark.externaldb]),
- pytest.param(setup_psycopg2_store, id='psycopg2', marks=[pytest.mark.externaldb]),
- pytest.param(setup_mysql_store, id='mysql', marks=[pytest.mark.externaldb]),
- pytest.param(setup_asyncpg_store, id='asyncpg', marks=[pytest.mark.externaldb])
-])
-async def datastore_cm(request):
- cm = request.param()
- if isinstance(cm, ContextManager):
- with cm as store:
- yield AsyncDataStoreAdapter(store)
- else:
- async with cm as store:
- yield store
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 74db6e7..4b01662 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -2,13 +2,16 @@ from __future__ import annotations
from contextlib import asynccontextmanager
from datetime import datetime, timezone
-from typing import AsyncContextManager, AsyncGenerator, List, Optional, Set, Type
+from tempfile import TemporaryDirectory
+from typing import AsyncGenerator, List, Optional, Set, Type
import anyio
import pytest
from freezegun.api import FrozenDateTimeFactory
-from apscheduler.abc import AsyncDataStore, Job, Schedule
+from apscheduler.abc import AsyncDataStore, DataStore, Job, Schedule
+from apscheduler.datastores.async_adapter import AsyncDataStoreAdapter
+from apscheduler.datastores.memory import MemoryDataStore
from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome
from apscheduler.events import (
Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskUpdated)
@@ -17,6 +20,114 @@ from apscheduler.triggers.date import DateTrigger
@pytest.fixture
+def memory_store() -> DataStore:
+ yield MemoryDataStore()
+
+
+@pytest.fixture
+def mongodb_store() -> DataStore:
+ from pymongo import MongoClient
+
+ from apscheduler.datastores.mongodb import MongoDBDataStore
+
+ with MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000) as client:
+ yield MongoDBDataStore(client, start_from_scratch=True)
+
+
+@pytest.fixture
+def sqlite_store() -> DataStore:
+ from sqlalchemy.future import create_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ with TemporaryDirectory('sqlite_') as tempdir:
+ engine = create_engine(f'sqlite:///{tempdir}/test.db')
+ try:
+ yield SQLAlchemyDataStore(engine)
+ finally:
+ engine.dispose()
+
+
+@pytest.fixture
+def psycopg2_store() -> DataStore:
+ from sqlalchemy.future import create_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ engine = create_engine('postgresql+psycopg2://postgres:secret@localhost/testdb')
+ try:
+ yield SQLAlchemyDataStore(engine, start_from_scratch=True)
+ finally:
+ engine.dispose()
+
+
+@pytest.fixture
+def mysql_store() -> DataStore:
+ from sqlalchemy.future import create_engine
+
+ from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
+
+ engine = create_engine('mysql+pymysql://root:secret@localhost/testdb')
+ try:
+ yield SQLAlchemyDataStore(engine, start_from_scratch=True)
+ finally:
+ engine.dispose()
+
+
+@pytest.fixture
+async def asyncpg_store() -> AsyncDataStore:
+ from sqlalchemy.ext.asyncio import create_async_engine
+
+ from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore
+
+ engine = create_async_engine('postgresql+asyncpg://postgres:secret@localhost/testdb',
+ future=True)
+ try:
+ yield AsyncSQLAlchemyDataStore(engine, start_from_scratch=True)
+ finally:
+ await engine.dispose()
+
+
+@pytest.fixture(params=[
+ pytest.param(pytest.lazy_fixture('memory_store'), id='memory'),
+ pytest.param(pytest.lazy_fixture('sqlite'), id='sqlite'),
+ pytest.param(pytest.lazy_fixture('mongodb_store'), id='mongodb',
+ marks=[pytest.mark.external_service]),
+ pytest.param(pytest.lazy_fixture('psycopg2_store'), id='psycopg2',
+ marks=[pytest.mark.external_service]),
+ pytest.param(pytest.lazy_fixture('mysql_store'), id='mysql',
+ marks=[pytest.mark.external_service])
+])
+def sync_store(request) -> DataStore:
+ return request.param
+
+
+@pytest.fixture(params=[
+ pytest.param(pytest.lazy_fixture('asyncpg_store'), id='asyncpg',
+ marks=[pytest.mark.external_service])
+])
+def async_store(request) -> AsyncDataStore:
+ return request.param
+
+
+@pytest.fixture(params=[
+ pytest.param(pytest.lazy_fixture('memory_store'), id='memory'),
+ pytest.param(pytest.lazy_fixture('sqlite_store'), id='sqlite'),
+ pytest.param(pytest.lazy_fixture('mongodb_store'), id='mongodb',
+ marks=[pytest.mark.external_service]),
+ pytest.param(pytest.lazy_fixture('psycopg2_store'), id='psycopg2',
+ marks=[pytest.mark.external_service]),
+ pytest.param(pytest.lazy_fixture('mysql_store'), id='mysql',
+ marks=[pytest.mark.external_service])
+])
+async def datastore(request):
+ if isinstance(request.param, DataStore):
+ return AsyncDataStoreAdapter(request.param)
+ else:
+ return request.param
+
+
+@pytest.fixture
def schedules() -> List[Schedule]:
trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
schedule1 = Schedule(id='s1', task_id='task1', trigger=trigger)
@@ -33,18 +144,18 @@ def schedules() -> List[Schedule]:
@asynccontextmanager
async def capture_events(
- store: AsyncDataStore, limit: int,
+ datastore: AsyncDataStore, limit: int,
event_types: Optional[Set[Type[Event]]] = None
) -> AsyncGenerator[List[Event], None]:
def listener(event: Event) -> None:
events.append(event)
if len(events) == limit:
limit_event.set()
- store.events.unsubscribe(token)
+ datastore.events.unsubscribe(token)
events: List[Event] = []
limit_event = anyio.Event()
- token = store.events.subscribe(listener, event_types)
+ token = datastore.events.subscribe(listener, event_types)
yield events
if limit:
with anyio.fail_after(3):
@@ -53,17 +164,16 @@ async def capture_events(
@pytest.mark.anyio
class TestAsyncStores:
- async def test_add_replace_task(
- self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None:
+ async def test_add_replace_task(self, datastore: AsyncDataStore) -> None:
import math
event_types = {TaskAdded, TaskUpdated}
- async with datastore_cm as store, capture_events(store, 3, event_types) as events:
- await store.add_task(Task(id='test_task', func=print))
- await store.add_task(Task(id='test_task2', func=math.ceil))
- await store.add_task(Task(id='test_task', func=repr))
+ async with datastore, capture_events(datastore, 3, event_types) as events:
+ await datastore.add_task(Task(id='test_task', func=print))
+ await datastore.add_task(Task(id='test_task2', func=math.ceil))
+ await datastore.add_task(Task(id='test_task', func=repr))
- tasks = await store.get_tasks()
+ tasks = await datastore.get_tasks()
assert len(tasks) == 2
assert tasks[0].id == 'test_task'
assert tasks[0].func is repr
@@ -84,36 +194,36 @@ class TestAsyncStores:
assert not events
- async def test_add_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ async def test_add_schedules(self, datastore: AsyncDataStore,
schedules: List[Schedule]) -> None:
- async with datastore_cm as store, capture_events(store, 3, {ScheduleAdded}) as events:
+ async with datastore, capture_events(datastore, 3, {ScheduleAdded}) as events:
for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
- assert await store.get_schedules() == schedules
- assert await store.get_schedules({'s1', 's2', 's3'}) == schedules
- assert await store.get_schedules({'s1'}) == [schedules[0]]
- assert await store.get_schedules({'s2'}) == [schedules[1]]
- assert await store.get_schedules({'s3'}) == [schedules[2]]
+ assert await datastore.get_schedules() == schedules
+ assert await datastore.get_schedules({'s1', 's2', 's3'}) == schedules
+ assert await datastore.get_schedules({'s1'}) == [schedules[0]]
+ assert await datastore.get_schedules({'s2'}) == [schedules[1]]
+ assert await datastore.get_schedules({'s3'}) == [schedules[2]]
for event, schedule in zip(events, schedules):
assert event.schedule_id == schedule.id
assert event.next_fire_time == schedule.next_fire_time
- async def test_replace_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ async def test_replace_schedules(self, datastore: AsyncDataStore,
schedules: List[Schedule]) -> None:
- async with datastore_cm as store, capture_events(store, 1, {ScheduleUpdated}) as events:
+ async with datastore, capture_events(datastore, 1, {ScheduleUpdated}) as events:
for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
next_fire_time = schedules[2].trigger.next()
schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(),
kwargs={}, coalesce=CoalescePolicy.earliest,
misfire_grace_time=None, tags=frozenset())
schedule.next_fire_time = next_fire_time
- await store.add_schedule(schedule, ConflictPolicy.replace)
+ await datastore.add_schedule(schedule, ConflictPolicy.replace)
- schedules = await store.get_schedules({schedule.id})
+ schedules = await datastore.get_schedules({schedule.id})
assert schedules[0].task_id == 'foo'
assert schedules[0].next_fire_time == next_fire_time
assert schedules[0].args == ()
@@ -127,14 +237,14 @@ class TestAsyncStores:
assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
assert not events
- async def test_remove_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ async def test_remove_schedules(self, datastore: AsyncDataStore,
schedules: List[Schedule]) -> None:
- async with datastore_cm as store, capture_events(store, 2, {ScheduleRemoved}) as events:
+ async with datastore, capture_events(datastore, 2, {ScheduleRemoved}) as events:
for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
- await store.remove_schedules(['s1', 's2'])
- assert await store.get_schedules() == [schedules[2]]
+ await datastore.remove_schedules(['s1', 's2'])
+ assert await datastore.get_schedules() == [schedules[2]]
received_event = events.pop(0)
assert received_event.schedule_id == 's1'
@@ -146,24 +256,24 @@ class TestAsyncStores:
@pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
async def test_acquire_release_schedules(
- self, datastore_cm, schedules: List[Schedule]) -> None:
+ self, datastore: AsyncDataStore, schedules: List[Schedule]) -> None:
event_types = {ScheduleRemoved, ScheduleUpdated}
- async with datastore_cm as store, capture_events(store, 2, event_types) as events:
+ async with datastore, capture_events(datastore, 2, event_types) as events:
for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
+ await datastore.add_schedule(schedule, ConflictPolicy.exception)
# The first scheduler gets the first due schedule
- schedules1 = await store.acquire_schedules('dummy-id1', 1)
+ schedules1 = await datastore.acquire_schedules('dummy-id1', 1)
assert len(schedules1) == 1
assert schedules1[0].id == 's1'
# The second scheduler gets the second due schedule
- schedules2 = await store.acquire_schedules('dummy-id2', 1)
+ schedules2 = await datastore.acquire_schedules('dummy-id2', 1)
assert len(schedules2) == 1
assert schedules2[0].id == 's2'
# The third scheduler gets nothing
- schedules3 = await store.acquire_schedules('dummy-id3', 1)
+ schedules3 = await datastore.acquire_schedules('dummy-id3', 1)
assert not schedules3
# Update the schedules and check that the job store actually deletes the first
@@ -172,11 +282,11 @@ class TestAsyncStores:
schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
# Release all the schedules
- await store.release_schedules('dummy-id1', schedules1)
- await store.release_schedules('dummy-id2', schedules2)
+ await datastore.release_schedules('dummy-id1', schedules1)
+ await datastore.release_schedules('dummy-id2', schedules2)
# Check that the first schedule is gone
- schedules = await store.get_schedules()
+ schedules = await datastore.get_schedules()
assert len(schedules) == 2
assert schedules[0].id == 's2'
assert schedules[1].id == 's3'
@@ -194,192 +304,187 @@ class TestAsyncStores:
assert not events
async def test_acquire_schedules_lock_timeout(
- self, datastore_cm, schedules: List[Schedule], freezer) -> None:
+ self, datastore: AsyncDataStore, schedules: List[Schedule], freezer) -> None:
"""
Test that a scheduler can acquire schedules that were acquired by another scheduler but
not released within the lock timeout period.
"""
- async with datastore_cm as store:
- await store.add_schedule(schedules[0], ConflictPolicy.exception)
+ async with datastore:
+ await datastore.add_schedule(schedules[0], ConflictPolicy.exception)
# First, one scheduler acquires the first available schedule
- acquired1 = await store.acquire_schedules('dummy-id1', 1)
+ acquired1 = await datastore.acquire_schedules('dummy-id1', 1)
assert len(acquired1) == 1
assert acquired1[0].id == 's1'
# Try to acquire the schedule just at the threshold (now == acquired_until).
# This should not yield any schedules.
freezer.tick(30)
- acquired2 = await store.acquire_schedules('dummy-id2', 1)
+ acquired2 = await datastore.acquire_schedules('dummy-id2', 1)
assert not acquired2
# Right after that, the schedule should be available
freezer.tick(1)
- acquired3 = await store.acquire_schedules('dummy-id2', 1)
+ acquired3 = await datastore.acquire_schedules('dummy-id2', 1)
assert len(acquired3) == 1
assert acquired3[0].id == 's1'
- async def test_acquire_multiple_workers(
- self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None:
- async with datastore_cm as store:
- await store.add_task(Task(id='task1', func=asynccontextmanager))
+ async def test_acquire_multiple_workers(self, datastore: AsyncDataStore) -> None:
+ async with datastore:
+ await datastore.add_task(Task(id='task1', func=asynccontextmanager))
jobs = [Job(task_id='task1') for _ in range(2)]
for job in jobs:
- await store.add_job(job)
+ await datastore.add_job(job)
# The first worker gets the first job in the queue
- jobs1 = await store.acquire_jobs('worker1', 1)
+ jobs1 = await datastore.acquire_jobs('worker1', 1)
assert len(jobs1) == 1
assert jobs1[0].id == jobs[0].id
# The second worker gets the second job
- jobs2 = await store.acquire_jobs('worker2', 1)
+ jobs2 = await datastore.acquire_jobs('worker2', 1)
assert len(jobs2) == 1
assert jobs2[0].id == jobs[1].id
# The third worker gets nothing
- jobs3 = await store.acquire_jobs('worker3', 1)
+ jobs3 = await datastore.acquire_jobs('worker3', 1)
assert not jobs3
- async def test_job_release_success(
- self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None:
- async with datastore_cm as store:
- await store.add_task(Task(id='task1', func=asynccontextmanager))
+ async def test_job_release_success(self, datastore: AsyncDataStore) -> None:
+ async with datastore:
+ await datastore.add_task(Task(id='task1', func=asynccontextmanager))
job = Job(task_id='task1')
- await store.add_job(job)
+ await datastore.add_job(job)
- acquired = await store.acquire_jobs('worker_id', 2)
+ acquired = await datastore.acquire_jobs('worker_id', 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
- await store.release_job(
+ await datastore.release_job(
'worker_id', acquired[0].task_id,
JobResult(job_id=acquired[0].id, outcome=JobOutcome.success, return_value='foo'))
- result = await store.get_job_result(acquired[0].id)
+ result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.success
assert result.exception is None
assert result.return_value == 'foo'
# Check that the job and its result are gone
- assert not await store.get_jobs({acquired[0].id})
- assert not await store.get_job_result(acquired[0].id)
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- async def test_job_release_failure(
- self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None:
- async with datastore_cm as store:
- await store.add_task(Task(id='task1', func=asynccontextmanager))
+ async def test_job_release_failure(self, datastore: AsyncDataStore) -> None:
+ async with datastore:
+ await datastore.add_task(Task(id='task1', func=asynccontextmanager))
job = Job(task_id='task1')
- await store.add_job(job)
+ await datastore.add_job(job)
- acquired = await store.acquire_jobs('worker_id', 2)
+ acquired = await datastore.acquire_jobs('worker_id', 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
- await store.release_job(
+ await datastore.release_job(
'worker_id', acquired[0].task_id,
JobResult(job_id=acquired[0].id, outcome=JobOutcome.failure,
exception=ValueError('foo')))
- result = await store.get_job_result(acquired[0].id)
+ result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.failure
assert isinstance(result.exception, ValueError)
assert result.exception.args == ('foo',)
assert result.return_value is None
# Check that the job and its result are gone
- assert not await store.get_jobs({acquired[0].id})
- assert not await store.get_job_result(acquired[0].id)
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- async def test_job_release_missed_deadline(
- self, datastore_cm: AsyncContextManager[AsyncDataStore]):
- async with datastore_cm as store:
- await store.add_task(Task(id='task1', func=asynccontextmanager))
+ async def test_job_release_missed_deadline(self, datastore: AsyncDataStore):
+ async with datastore:
+ await datastore.add_task(Task(id='task1', func=asynccontextmanager))
job = Job(task_id='task1')
- await store.add_job(job)
+ await datastore.add_job(job)
- acquired = await store.acquire_jobs('worker_id', 2)
+ acquired = await datastore.acquire_jobs('worker_id', 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
- await store.release_job(
+ await datastore.release_job(
'worker_id', acquired[0].task_id,
JobResult(job_id=acquired[0].id, outcome=JobOutcome.missed_start_deadline))
- result = await store.get_job_result(acquired[0].id)
+ result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.missed_start_deadline
assert result.exception is None
assert result.return_value is None
# Check that the job and its result are gone
- assert not await store.get_jobs({acquired[0].id})
- assert not await store.get_job_result(acquired[0].id)
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- async def test_job_release_cancelled(
- self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None:
- async with datastore_cm as store:
- await store.add_task(Task(id='task1', func=asynccontextmanager))
+ async def test_job_release_cancelled(self, datastore: AsyncDataStore) -> None:
+ async with datastore:
+ await datastore.add_task(Task(id='task1', func=asynccontextmanager))
job = Job(task_id='task1')
- await store.add_job(job)
+ await datastore.add_job(job)
- acquired = await store.acquire_jobs('worker1', 2)
+ acquired = await datastore.acquire_jobs('worker1', 2)
assert len(acquired) == 1
assert acquired[0].id == job.id
- await store.release_job('worker1', acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled))
- result = await store.get_job_result(acquired[0].id)
+ await datastore.release_job(
+ 'worker1', acquired[0].task_id,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.cancelled))
+ result = await datastore.get_job_result(acquired[0].id)
assert result.outcome is JobOutcome.cancelled
assert result.exception is None
assert result.return_value is None
# Check that the job and its result are gone
- assert not await store.get_jobs({acquired[0].id})
- assert not await store.get_job_result(acquired[0].id)
+ assert not await datastore.get_jobs({acquired[0].id})
+ assert not await datastore.get_job_result(acquired[0].id)
- async def test_acquire_jobs_lock_timeout(
- self, datastore_cm: AsyncContextManager[AsyncDataStore],
- freezer: FrozenDateTimeFactory) -> None:
+ async def test_acquire_jobs_lock_timeout(self, datastore: AsyncDataStore,
+ freezer: FrozenDateTimeFactory) -> None:
"""
Test that a worker can acquire jobs that were acquired by another scheduler but not
released within the lock timeout period.
"""
- async with datastore_cm as store:
- await store.add_task(Task(id='task1', func=asynccontextmanager))
+ async with datastore:
+ await datastore.add_task(Task(id='task1', func=asynccontextmanager))
job = Job(task_id='task1')
- await store.add_job(job)
+ await datastore.add_job(job)
# First, one worker acquires the first available job
- acquired = await store.acquire_jobs('worker1', 1)
+ acquired = await datastore.acquire_jobs('worker1', 1)
assert len(acquired) == 1
assert acquired[0].id == job.id
# Try to acquire the job just at the threshold (now == acquired_until).
# This should not yield any jobs.
freezer.tick(30)
- assert not await store.acquire_jobs('worker2', 1)
+ assert not await datastore.acquire_jobs('worker2', 1)
# Right after that, the job should be available
freezer.tick(1)
- acquired = await store.acquire_jobs('worker2', 1)
+ acquired = await datastore.acquire_jobs('worker2', 1)
assert len(acquired) == 1
assert acquired[0].id == job.id
- async def test_acquire_jobs_max_number_exceeded(
- self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None:
- async with datastore_cm as store:
- await store.add_task(Task(id='task1', func=asynccontextmanager, max_running_jobs=2))
+ async def test_acquire_jobs_max_number_exceeded(self, datastore: AsyncDataStore) -> None:
+ async with datastore:
+ await datastore.add_task(
+ Task(id='task1', func=asynccontextmanager, max_running_jobs=2))
jobs = [Job(task_id='task1'), Job(task_id='task1'), Job(task_id='task1')]
for job in jobs:
- await store.add_job(job)
+ await datastore.add_job(job)
# Check that only 2 jobs are returned from acquire_jobs() even though the limit wqas 3
- acquired_jobs = await store.acquire_jobs('worker1', 3)
+ acquired_jobs = await datastore.acquire_jobs('worker1', 3)
assert [job.id for job in acquired_jobs] == [job.id for job in jobs[:2]]
# Release one job, and the worker should be able to acquire the third job
- await store.release_job(
+ await datastore.release_job(
'worker1', acquired_jobs[0].task_id,
JobResult(job_id=acquired_jobs[0].id, outcome=JobOutcome.success,
return_value=None))
- acquired_jobs = await store.acquire_jobs('worker1', 3)
+ acquired_jobs = await datastore.acquire_jobs('worker1', 3)
assert [job.id for job in acquired_jobs] == [jobs[2].id]