summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-08-29 01:02:10 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-08-29 01:38:09 +0300
commitb4d4724e95583b9f075a814319c3d5e8e5514a3e (patch)
treedd77fb25ded2ceb5a4f29221de69f19f469cfac0 /tests
parentcf77aec5326e42af7b89e4ab2712daf9694ebad9 (diff)
downloadapscheduler-b4d4724e95583b9f075a814319c3d5e8e5514a3e.tar.gz
Overhauled the data store and event dispatch systems
Diffstat (limited to 'tests')
-rw-r--r--tests/conftest.py200
-rw-r--r--tests/test_datastores.py412
-rw-r--r--tests/test_events.py134
-rw-r--r--tests/test_schedulers.py123
-rw-r--r--tests/test_workers.py282
5 files changed, 729 insertions, 422 deletions
diff --git a/tests/conftest.py b/tests/conftest.py
index bf6b975..c242b8d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,10 +1,11 @@
import sys
-from contextlib import AsyncExitStack, ExitStack
-from functools import partial
+from contextlib import asynccontextmanager, contextmanager
+from typing import AsyncContextManager, AsyncGenerator, ContextManager, Generator, Optional
import pytest
-from anyio import start_blocking_portal
-from apscheduler.datastores.memory import MemoryDataStore
+from apscheduler.abc import AsyncDataStore, DataStore, Serializer
+from apscheduler.adapters import AsyncDataStoreAdapter
+from apscheduler.datastores.sync.memory import MemoryDataStore
from apscheduler.serializers.cbor import CBORSerializer
from apscheduler.serializers.json import JSONSerializer
from apscheduler.serializers.pickle import PickleSerializer
@@ -14,94 +15,135 @@ if sys.version_info >= (3, 9):
else:
from backports.zoneinfo import ZoneInfo
-try:
- from apscheduler.datastores.mongodb import MongoDBDataStore
- from motor.motor_asyncio import AsyncIOMotorClient
-except ImportError:
- MongoDBDataStore = None
-
-try:
- from apscheduler.datastores.postgresql import PostgresqlDataStore
- from asyncpg import create_pool
-except ImportError:
- PostgresqlDataStore = None
-
-store_params = [
- pytest.param(MemoryDataStore, id='memory'),
- pytest.param(PostgresqlDataStore, id='postgresql'),
- pytest.param(MongoDBDataStore, id='mongodb')
-]
-
@pytest.fixture(scope='session')
-def timezone():
+def timezone() -> ZoneInfo:
return ZoneInfo('Europe/Berlin')
-@pytest.fixture(params=[None, PickleSerializer, CBORSerializer, JSONSerializer],
- ids=['none', 'pickle', 'cbor', 'json'])
-def serializer(request):
+@pytest.fixture(params=[
+ pytest.param(None, id='none'),
+ pytest.param(PickleSerializer, id='pickle'),
+ pytest.param(CBORSerializer, id='cbor'),
+ pytest.param(JSONSerializer, id='json')
+])
+def serializer(request) -> Optional[Serializer]:
return request.param() if request.param else None
@pytest.fixture
-def anyio_backend():
+def anyio_backend() -> 'str':
return 'asyncio'
-@pytest.fixture(params=store_params)
-async def store(request):
- async with AsyncExitStack() as stack:
- if request.param is PostgresqlDataStore:
- if PostgresqlDataStore is None:
- pytest.skip('asyncpg not installed')
-
- pool = await create_pool('postgresql://postgres:secret@localhost/testdb',
- min_size=1, max_size=2)
- await stack.enter_async_context(pool)
- store = PostgresqlDataStore(pool, start_from_scratch=True)
- elif request.param is MongoDBDataStore:
- if MongoDBDataStore is None:
- pytest.skip('motor not installed')
-
- client = AsyncIOMotorClient(tz_aware=True)
- stack.push(lambda *args: client.close())
- store = MongoDBDataStore(client, start_from_scratch=True)
- else:
- store = MemoryDataStore()
-
- await stack.enter_async_context(store)
+@contextmanager
+def setup_mongodb_store() -> Generator[DataStore, None, None]:
+ from apscheduler.datastores.sync.mongodb import MongoDBDataStore
+ from pymongo import MongoClient
+ from pymongo.errors import ConnectionFailure
+
+ client = MongoClient(tz_aware=True, serverSelectionTimeoutMS=1000)
+ try:
+ client.admin.command('ismaster')
+ except ConnectionFailure:
+ pytest.skip('MongoDB server not available')
+ raise
+
+ store = MongoDBDataStore(client, start_from_scratch=True)
+ with client, store:
yield store
-@pytest.fixture
-def portal():
- with start_blocking_portal() as portal:
- yield portal
-
-
-@pytest.fixture(params=store_params)
-def sync_store(request, portal):
- with ExitStack() as stack:
- if request.param is PostgresqlDataStore:
- if PostgresqlDataStore is None:
- pytest.skip('asyncpg not installed')
-
- pool = portal.call(
- partial(create_pool, 'postgresql://postgres:secret@localhost/testdb',
- min_size=1, max_size=2)
- )
- stack.enter_context(portal.wrap_async_context_manager(pool))
- store = PostgresqlDataStore(pool, start_from_scratch=True)
- elif request.param is MongoDBDataStore:
- if MongoDBDataStore is None:
- pytest.skip('motor not installed')
-
- client = portal.call(partial(AsyncIOMotorClient, tz_aware=True))
- stack.push(lambda *args: portal.call(client.close))
- store = MongoDBDataStore(client, start_from_scratch=True)
- else:
- store = MemoryDataStore()
-
- stack.enter_context(portal.wrap_async_context_manager(store))
+@contextmanager
+def setup_memory_store() -> Generator[DataStore, None, None]:
+ with MemoryDataStore() as store:
yield store
+
+
+@asynccontextmanager
+async def setup_postgresql_store() -> AsyncGenerator[AsyncDataStore, None]:
+ try:
+ from apscheduler.datastores.async_.postgresql import PostgresqlDataStore
+ from asyncpg import create_pool
+ except ModuleNotFoundError:
+ pytest.skip('asyncpg not installed')
+ raise
+
+ pool = await create_pool('postgresql://postgres:secret@localhost/testdb',
+ min_size=1, max_size=2)
+ store = PostgresqlDataStore(pool, start_from_scratch=True)
+ async with pool, store:
+ yield store
+
+
+@contextmanager
+def setup_sqlalchemy_store() -> Generator[DataStore, None, None]:
+ try:
+ from apscheduler.datastores.sync.sqlalchemy import SQLAlchemyDataStore
+ from sqlalchemy import create_engine
+ except ModuleNotFoundError:
+ pytest.skip('sqlalchemy not installed')
+ raise
+
+ engine = create_engine('postgresql+psycopg2://postgres:secret@localhost/testdb', future=True)
+ store = SQLAlchemyDataStore(engine, start_from_scratch=True)
+ try:
+ with store:
+ yield store
+ finally:
+ engine.dispose()
+
+
+@asynccontextmanager
+async def setup_async_sqlalchemy_store() -> AsyncGenerator[AsyncDataStore, None]:
+ try:
+ from apscheduler.datastores.async_.sqlalchemy import SQLAlchemyDataStore
+ from sqlalchemy.ext.asyncio import create_async_engine
+ except ModuleNotFoundError:
+ pytest.skip('sqlalchemy not installed')
+ raise
+
+ engine = create_async_engine('postgresql+asyncpg://postgres:secret@localhost/testdb',
+ future=True)
+ store = SQLAlchemyDataStore(engine, start_from_scratch=True)
+ try:
+ async with store:
+ yield store
+ finally:
+ await engine.dispose()
+
+
+@pytest.fixture(params=[
+ pytest.param(setup_memory_store, id='memory'),
+ pytest.param(setup_mongodb_store, id='mongodb')
+])
+def setup_sync_store(request) -> ContextManager[DataStore]:
+ return request.param
+
+
+@pytest.fixture(params=[
+ pytest.param(setup_postgresql_store, id='postgresql'),
+ pytest.param(setup_async_sqlalchemy_store, id='async_sqlalchemy')
+])
+def setup_async_store(request) -> AsyncContextManager[AsyncDataStore]:
+ return request.param
+
+
+@pytest.fixture(params=[
+ pytest.param(setup_memory_store, id='memory'),
+ pytest.param(setup_mongodb_store, id='mongodb'),
+ pytest.param(setup_postgresql_store, id='postgresql'),
+ pytest.param(setup_async_sqlalchemy_store, id='async_sqlalchemy')
+])
+def datastore_cm(request):
+ cm = request.param()
+ if isinstance(cm, AsyncContextManager):
+ return cm
+
+ @asynccontextmanager
+ async def wrapper():
+ with cm as store:
+ async with AsyncDataStoreAdapter(store) as adapter:
+ yield adapter
+
+ return wrapper()
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 0e4fd68..0f10c61 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -1,27 +1,18 @@
+from __future__ import annotations
+
from datetime import datetime, timezone
+from typing import AsyncContextManager, List
import pytest
-from anyio import move_on_after
-from apscheduler.abc import Job, Schedule
+from apscheduler.abc import AsyncDataStore, Job, Schedule
from apscheduler.events import ScheduleAdded, ScheduleRemoved, ScheduleUpdated
from apscheduler.policies import CoalescePolicy, ConflictPolicy
from apscheduler.triggers.date import DateTrigger
-
-pytestmark = [
- pytest.mark.anyio,
- pytest.mark.parametrize('anyio_backend', ['asyncio'])
-]
-
-
-@pytest.fixture
-async def events(store):
- events = []
- store.subscribe(events.append)
- return events
+from freezegun.api import FrozenDateTimeFactory
@pytest.fixture
-def schedules():
+def schedules() -> List[Schedule]:
trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
schedule1 = Schedule(id='s1', task_id='bogus', trigger=trigger, args=(), kwargs={},
coalesce=CoalescePolicy.latest, misfire_grace_time=None, tags=frozenset())
@@ -39,7 +30,7 @@ def schedules():
@pytest.fixture
-def jobs():
+def jobs() -> List[Job]:
job1 = Job('task1', print, ('hello',), {'arg2': 'world'}, 'schedule1',
datetime(2020, 10, 10, tzinfo=timezone.utc),
datetime(2020, 10, 10, 1, tzinfo=timezone.utc), frozenset())
@@ -48,197 +39,198 @@ def jobs():
return [job1, job2]
-async def test_add_schedules(store, schedules, events):
- for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
-
- assert await store.get_schedules() == schedules
- assert await store.get_schedules({'s1', 's2', 's3'}) == schedules
- assert await store.get_schedules({'s1'}) == [schedules[0]]
- assert await store.get_schedules({'s2'}) == [schedules[1]]
- assert await store.get_schedules({'s3'}) == [schedules[2]]
-
- assert len(events) == 3
- for event, schedule in zip(events, schedules):
- assert isinstance(event, ScheduleAdded)
- assert event.schedule_id == schedule.id
- assert event.next_fire_time == schedule.next_fire_time
-
-
-async def test_replace_schedules(store, schedules, events):
- for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
-
- events.clear()
- next_fire_time = schedules[2].trigger.next()
- schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(),
- kwargs={}, coalesce=CoalescePolicy.earliest, misfire_grace_time=None,
- tags=frozenset())
- schedule.next_fire_time = next_fire_time
- await store.add_schedule(schedule, ConflictPolicy.replace)
-
- schedules = await store.get_schedules({schedule.id})
- assert schedules[0].task_id == 'foo'
- assert schedules[0].next_fire_time == next_fire_time
- assert schedules[0].args == ()
- assert schedules[0].kwargs == {}
- assert schedules[0].coalesce is CoalescePolicy.earliest
- assert schedules[0].misfire_grace_time is None
- assert schedules[0].tags == frozenset()
-
- assert len(events) == 1
- assert isinstance(events[0], ScheduleUpdated)
- assert events[0].schedule_id == 's3'
- assert events[0].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
-
-
-async def test_remove_schedules(store, schedules, events):
- for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
-
- events.clear()
- await store.remove_schedules(['s1', 's2'])
-
- assert len(events) == 2
- assert isinstance(events[0], ScheduleRemoved)
- assert events[0].schedule_id == 's1'
- assert isinstance(events[1], ScheduleRemoved)
- assert events[1].schedule_id == 's2'
-
- assert await store.get_schedules() == [schedules[2]]
-
-
-@pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
-async def test_acquire_release_schedules(store, schedules, events):
- for schedule in schedules:
- await store.add_schedule(schedule, ConflictPolicy.exception)
-
- events.clear()
-
- # The first scheduler gets the first due schedule
- schedules1 = await store.acquire_schedules('dummy-id1', 1)
- assert len(schedules1) == 1
- assert schedules1[0].id == 's1'
-
- # The second scheduler gets the second due schedule
- schedules2 = await store.acquire_schedules('dummy-id2', 1)
- assert len(schedules2) == 1
- assert schedules2[0].id == 's2'
-
- # The third scheduler gets nothing
- async with move_on_after(0.2):
- await store.acquire_schedules('dummy-id3', 1)
- pytest.fail('The call should have timed out')
-
- # The schedules here have run their course, and releasing them should delete them
- schedules1[0].next_fire_time = None
- schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
- await store.release_schedules('dummy-id1', schedules1)
- await store.release_schedules('dummy-id2', schedules2)
-
- # Check that the first schedule is gone
- schedules = await store.get_schedules()
- assert len(schedules) == 2
- assert schedules[0].id == 's2'
- assert schedules[1].id == 's3'
-
- # Check for the appropriate update and delete events
- assert len(events) == 2
- assert isinstance(events[0], ScheduleRemoved)
- assert isinstance(events[1], ScheduleUpdated)
- assert events[0].schedule_id == 's1'
- assert events[1].schedule_id == 's2'
- assert events[1].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
-
-
-async def test_acquire_schedules_lock_timeout(store, schedules, events, freezer):
- """
- Test that a scheduler can acquire schedules that were acquired by another scheduler but not
- released within the lock timeout period.
-
- """
- # First, one scheduler acquires the first available schedule
- await store.add_schedule(schedules[0], ConflictPolicy.exception)
- acquired = await store.acquire_schedules('dummy-id1', 1)
- assert len(acquired) == 1
- assert acquired[0].id == 's1'
-
- # Try to acquire the schedule just at the threshold (now == acquired_until).
- # This should not yield any schedules.
- freezer.tick(30)
- async with move_on_after(0.2):
- await store.acquire_schedules('dummy-id2', 1)
- pytest.fail('The call should have timed out')
-
- # Right after that, the schedule should be available
- freezer.tick(1)
- acquired = await store.acquire_schedules('dummy-id2', 1)
- assert len(acquired) == 1
- assert acquired[0].id == 's1'
-
-
-async def test_acquire_release_jobs(store, jobs, events):
- for job in jobs:
- await store.add_job(job)
-
- events.clear()
-
- # The first worker gets the first job in the queue
- jobs1 = await store.acquire_jobs('dummy-id1', 1)
- assert len(jobs1) == 1
- assert jobs1[0].id == jobs[0].id
-
- # The second worker gets the second job
- jobs2 = await store.acquire_jobs('dummy-id2', 1)
- assert len(jobs2) == 1
- assert jobs2[0].id == jobs[1].id
-
- # The third worker gets nothing
- async with move_on_after(0.2):
- await store.acquire_jobs('dummy-id3', 1)
- pytest.fail('The call should have timed out')
-
- # All the jobs should still be returned
- visible_jobs = await store.get_jobs()
- assert len(visible_jobs) == 2
-
- await store.release_jobs('dummy-id1', jobs1)
- await store.release_jobs('dummy-id2', jobs2)
-
- # All the jobs should be gone
- visible_jobs = await store.get_jobs()
- assert len(visible_jobs) == 0
-
- # Check for the appropriate update and delete events
- # assert len(events) == 2
- # assert isinstance(events[0], Job)
- # assert isinstance(events[1], SchedulesUpdated)
- # assert events[0].schedule_ids == {'s1'}
- # assert events[1].schedule_ids == {'s2'}
- # assert events[1].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
-
-
-async def test_acquire_jobs_lock_timeout(store, jobs, events, freezer):
- """
- Test that a worker can acquire jobs that were acquired by another scheduler but not
- released within the lock timeout period.
-
- """
- # First, one worker acquires the first available job
- await store.add_job(jobs[0])
- acquired = await store.acquire_jobs('dummy-id1', 1)
- assert len(acquired) == 1
- assert acquired[0].id == jobs[0].id
-
- # Try to acquire the job just at the threshold (now == acquired_until).
- # This should not yield any jobs.
- freezer.tick(30)
- async with move_on_after(0.2):
- await store.acquire_jobs('dummy-id2', 1)
- pytest.fail('The call should have timed out')
-
- # Right after that, the job should be available
- freezer.tick(1)
- acquired = await store.acquire_jobs('dummy-id2', 1)
- assert len(acquired) == 1
- assert acquired[0].id == jobs[0].id
+@pytest.mark.anyio
+class TestAsyncStores:
+ async def test_add_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ schedules: List[Schedule]) -> None:
+ events = []
+ async with datastore_cm as store:
+ store.subscribe(events.append, [ScheduleAdded])
+ for schedule in schedules:
+ await store.add_schedule(schedule, ConflictPolicy.exception)
+
+ assert await store.get_schedules() == schedules
+ assert await store.get_schedules({'s1', 's2', 's3'}) == schedules
+ assert await store.get_schedules({'s1'}) == [schedules[0]]
+ assert await store.get_schedules({'s2'}) == [schedules[1]]
+ assert await store.get_schedules({'s3'}) == [schedules[2]]
+
+ assert len(events) == 3
+ add_events = [event for event in events if isinstance(event, ScheduleAdded)]
+ for event, schedule in zip(add_events, schedules):
+ assert event.schedule_id == schedule.id
+ assert event.next_fire_time == schedule.next_fire_time
+
+ async def test_replace_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ schedules: List[Schedule]) -> None:
+ async with datastore_cm as store:
+ for schedule in schedules:
+ await store.add_schedule(schedule, ConflictPolicy.exception)
+
+ events = []
+ store.subscribe(events.append)
+ next_fire_time = schedules[2].trigger.next()
+ schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(),
+ kwargs={}, coalesce=CoalescePolicy.earliest,
+ misfire_grace_time=None, tags=frozenset())
+ schedule.next_fire_time = next_fire_time
+ await store.add_schedule(schedule, ConflictPolicy.replace)
+
+ schedules = await store.get_schedules({schedule.id})
+ assert schedules[0].task_id == 'foo'
+ assert schedules[0].next_fire_time == next_fire_time
+ assert schedules[0].args == ()
+ assert schedules[0].kwargs == {}
+ assert schedules[0].coalesce is CoalescePolicy.earliest
+ assert schedules[0].misfire_grace_time is None
+ assert schedules[0].tags == frozenset()
+
+ assert len(events) == 1
+ assert isinstance(events[0], ScheduleUpdated)
+ assert events[0].schedule_id == 's3'
+ assert events[0].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
+
+ async def test_remove_schedules(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ schedules: List[Schedule]) -> None:
+ events = []
+ async with datastore_cm as store:
+ for schedule in schedules:
+ await store.add_schedule(schedule, ConflictPolicy.exception)
+
+ store.subscribe(events.append)
+ await store.remove_schedules(['s1', 's2'])
+ assert await store.get_schedules() == [schedules[2]]
+
+ assert len(events) == 2
+ assert isinstance(events[0], ScheduleRemoved)
+ assert events[0].schedule_id == 's1'
+ assert isinstance(events[1], ScheduleRemoved)
+ assert events[1].schedule_id == 's2'
+
+ @pytest.mark.freeze_time(datetime(2020, 9, 14, tzinfo=timezone.utc))
+ async def test_acquire_release_schedules(
+ self, datastore_cm, schedules: List[Schedule]) -> None:
+ events = []
+ async with datastore_cm as store:
+ for schedule in schedules:
+ await store.add_schedule(schedule, ConflictPolicy.exception)
+
+ # The first scheduler gets the first due schedule
+ store.subscribe(events.append)
+ schedules1 = await store.acquire_schedules('dummy-id1', 1)
+ assert len(schedules1) == 1
+ assert schedules1[0].id == 's1'
+
+ # The second scheduler gets the second due schedule
+ schedules2 = await store.acquire_schedules('dummy-id2', 1)
+ assert len(schedules2) == 1
+ assert schedules2[0].id == 's2'
+
+ # The third scheduler gets nothing
+ assert not await store.acquire_schedules('dummy-id3', 1)
+
+ # The schedules here have run their course, and releasing them should delete them
+ schedules1[0].next_fire_time = None
+ schedules2[0].next_fire_time = datetime(2020, 9, 15, tzinfo=timezone.utc)
+ await store.release_schedules('dummy-id1', schedules1)
+ await store.release_schedules('dummy-id2', schedules2)
+
+ # Check that the first schedule is gone
+ schedules = await store.get_schedules()
+ assert len(schedules) == 2
+ assert schedules[0].id == 's2'
+ assert schedules[1].id == 's3'
+
+ # Check for the appropriate update and delete events
+ assert len(events) == 2
+ assert isinstance(events[0], ScheduleRemoved)
+ assert isinstance(events[1], ScheduleUpdated)
+ assert events[0].schedule_id == 's1'
+ assert events[1].schedule_id == 's2'
+ assert events[1].next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
+
+ async def test_acquire_schedules_lock_timeout(
+ self, datastore_cm, schedules: List[Schedule], freezer) -> None:
+ """
+ Test that a scheduler can acquire schedules that were acquired by another scheduler but not
+ released within the lock timeout period.
+
+ """
+ async with datastore_cm as store:
+ # First, one scheduler acquires the first available schedule
+ await store.add_schedule(schedules[0], ConflictPolicy.exception)
+ acquired = await store.acquire_schedules('dummy-id1', 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == 's1'
+
+ # Try to acquire the schedule just at the threshold (now == acquired_until).
+ # This should not yield any schedules.
+ freezer.tick(30)
+ assert not await store.acquire_schedules('dummy-id2', 1)
+
+ # Right after that, the schedule should be available
+ freezer.tick(1)
+ acquired = await store.acquire_schedules('dummy-id2', 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == 's1'
+
+ async def test_acquire_release_jobs(self, datastore_cm: AsyncContextManager[AsyncDataStore],
+ jobs: List[Job]) -> None:
+ events = []
+ async with datastore_cm as store:
+ for job in jobs:
+ await store.add_job(job)
+
+ # The first worker gets the first job in the queue
+ store.subscribe(events.append)
+ jobs1 = await store.acquire_jobs('dummy-id1', 1)
+ assert len(jobs1) == 1
+ assert jobs1[0].id == jobs[0].id
+
+ # The second worker gets the second job
+ jobs2 = await store.acquire_jobs('dummy-id2', 1)
+ assert len(jobs2) == 1
+ assert jobs2[0].id == jobs[1].id
+
+ # The third worker gets nothing
+ assert not await store.acquire_jobs('dummy-id3', 1)
+
+ # All the jobs should still be returned
+ visible_jobs = await store.get_jobs()
+ assert len(visible_jobs) == 2
+
+ await store.release_jobs('dummy-id1', jobs1)
+ await store.release_jobs('dummy-id2', jobs2)
+
+ # All the jobs should be gone
+ visible_jobs = await store.get_jobs()
+ assert len(visible_jobs) == 0
+
+ # Check for the appropriate events
+ assert not events
+
+ async def test_acquire_jobs_lock_timeout(
+ self, datastore_cm: AsyncContextManager[AsyncDataStore], jobs: List[Job],
+ freezer: FrozenDateTimeFactory) -> None:
+ """
+ Test that a worker can acquire jobs that were acquired by another scheduler but not
+ released within the lock timeout period.
+
+ """
+ async with datastore_cm as store:
+ # First, one worker acquires the first available job
+ await store.add_job(jobs[0])
+ acquired = await store.acquire_jobs('dummy-id1', 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == jobs[0].id
+
+ # Try to acquire the job just at the threshold (now == acquired_until).
+ # This should not yield any jobs.
+ freezer.tick(30)
+ assert not await store.acquire_jobs('dummy-id2', 1)
+
+ # Right after that, the job should be available
+ freezer.tick(1)
+ acquired = await store.acquire_jobs('dummy-id2', 1)
+ assert len(acquired) == 1
+ assert acquired[0].id == jobs[0].id
diff --git a/tests/test_events.py b/tests/test_events.py
new file mode 100644
index 0000000..3c9ebd6
--- /dev/null
+++ b/tests/test_events.py
@@ -0,0 +1,134 @@
+from datetime import datetime, timezone
+from functools import partial
+from operator import setitem
+from typing import List, Optional
+
+import pytest
+from _pytest.logging import LogCaptureFixture
+from apscheduler.events import AsyncEventHub, Event, EventHub
+
+
+class TestEventHub:
+ def test_publish(self) -> None:
+ timestamp = datetime.now(timezone.utc)
+ events: List[Optional[Event]] = [None, None]
+ with EventHub() as eventhub:
+ eventhub.subscribe(partial(setitem, events, 0))
+ eventhub.subscribe(partial(setitem, events, 1))
+ eventhub.publish(Event(timestamp=timestamp))
+
+ assert events[0] is events[1]
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
+
+ def test_unsubscribe(self) -> None:
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ with EventHub() as eventhub:
+ token = eventhub.subscribe(events.append)
+ eventhub.publish(Event(timestamp=timestamp))
+ eventhub.unsubscribe(token)
+ eventhub.publish(Event(timestamp=timestamp))
+
+ assert len(events) == 1
+
+ def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None:
+ with EventHub() as eventhub:
+ eventhub.publish(Event(timestamp=datetime.now(timezone.utc)))
+
+ assert not caplog.text
+
+ def test_publish_exception(self, caplog: LogCaptureFixture) -> None:
+ def bad_subscriber(event: Event) -> None:
+ raise Exception('foo')
+
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ with EventHub() as eventhub:
+ eventhub.subscribe(bad_subscriber)
+ eventhub.subscribe(events.append)
+ eventhub.publish(Event(timestamp=timestamp))
+
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
+ assert 'Error delivering Event' in caplog.text
+
+ def test_subscribe_coroutine_callback(self) -> None:
+ async def callback(event: Event) -> None:
+ pass
+
+ with EventHub() as eventhub:
+ with pytest.raises(ValueError, match='Coroutine functions are not supported'):
+ eventhub.subscribe(callback)
+
+ def test_relay_events(self) -> None:
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ with EventHub() as eventhub1, EventHub() as eventhub2:
+ eventhub2.relay_events_from(eventhub1)
+ eventhub2.subscribe(events.append)
+ eventhub1.publish(Event(timestamp=timestamp))
+
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
+
+
+@pytest.mark.anyio
+class TestAsyncEventHub:
+ async def test_publish(self) -> None:
+ async def async_setitem(event: Event) -> None:
+ events[1] = event
+
+ timestamp = datetime.now(timezone.utc)
+ events: List[Optional[Event]] = [None, None]
+ async with AsyncEventHub() as eventhub:
+ eventhub.subscribe(partial(setitem, events, 0))
+ eventhub.subscribe(async_setitem)
+ eventhub.publish(Event(timestamp=timestamp))
+
+ assert events[0] is events[1]
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
+
+ async def test_unsubscribe(self) -> None:
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ async with AsyncEventHub() as eventhub:
+ token = eventhub.subscribe(events.append)
+ eventhub.publish(Event(timestamp=timestamp))
+ eventhub.unsubscribe(token)
+ eventhub.publish(Event(timestamp=timestamp))
+
+ assert len(events) == 1
+
+ async def test_publish_no_subscribers(self, caplog: LogCaptureFixture) -> None:
+ async with AsyncEventHub() as eventhub:
+ eventhub.publish(Event(timestamp=datetime.now(timezone.utc)))
+
+ assert not caplog.text
+
+ async def test_publish_exception(self, caplog: LogCaptureFixture) -> None:
+ def bad_subscriber(event: Event) -> None:
+ raise Exception('foo')
+
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ async with AsyncEventHub() as eventhub:
+ eventhub.subscribe(bad_subscriber)
+ eventhub.subscribe(events.append)
+ eventhub.publish(Event(timestamp=timestamp))
+
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
+ assert 'Error delivering Event' in caplog.text
+
+ async def test_relay_events(self) -> None:
+ timestamp = datetime.now(timezone.utc)
+ events = []
+ async with AsyncEventHub() as eventhub1, AsyncEventHub() as eventhub2:
+ eventhub1.relay_events_from(eventhub2)
+ eventhub1.subscribe(events.append)
+ eventhub2.publish(Event(timestamp=timestamp))
+
+ assert isinstance(events[0], Event)
+ assert events[0].timestamp == timestamp
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index 3e125c0..5829f30 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -1,10 +1,14 @@
-import logging
+import threading
from datetime import datetime, timezone
+from typing import List
+import anyio
import pytest
-from apscheduler.events import JobSuccessful
+from anyio import fail_after
+from apscheduler.events import (
+ Event, JobAdded, ScheduleAdded, ScheduleRemoved, SchedulerStarted, SchedulerStopped)
from apscheduler.schedulers.async_ import AsyncScheduler
-from apscheduler.schedulers.sync import SyncScheduler
+from apscheduler.schedulers.sync import Scheduler
from apscheduler.triggers.date import DateTrigger
pytestmark = pytest.mark.anyio
@@ -19,40 +23,91 @@ def dummy_sync_job():
class TestAsyncScheduler:
- async def test_schedule_job(self, caplog, store):
- async def listener(event):
- events.append(event)
- if isinstance(event, JobSuccessful):
- await scheduler.stop()
+ async def test_schedule_job(self) -> None:
+ def listener(received_event: Event) -> None:
+ received_events.append(received_event)
+ if len(received_events) == 4:
+ event.set()
- caplog.set_level(logging.DEBUG)
+ received_events: List[Event] = []
+ event = anyio.Event()
+ scheduler = AsyncScheduler(start_worker=False)
+ scheduler.subscribe(listener)
trigger = DateTrigger(datetime.now(timezone.utc))
- events = []
- async with AsyncScheduler(store) as scheduler:
- scheduler.worker.subscribe(listener)
- await scheduler.add_schedule(dummy_async_job, trigger)
- await scheduler.wait_until_stopped()
+ async with scheduler:
+ await scheduler.add_schedule(dummy_async_job, trigger, id='foo')
+ with fail_after(3):
+ await event.wait()
- assert len(events) == 2
- assert isinstance(events[1], JobSuccessful)
- assert events[1].return_value == 'returnvalue'
+ # The scheduler was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStarted)
+
+ # Then a schedule was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleAdded)
+ assert received_event.schedule_id == 'foo'
+ # assert received_event.task_id == 'task_id'
+
+ # Then that schedule was processed and a job was added for it
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.schedule_id == 'foo'
+ assert received_event.task_id == 'test_schedulers:dummy_async_job'
+
+ # Then the schedule was removed since the trigger had been exhausted
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleRemoved)
+ assert received_event.schedule_id == 'foo'
+
+ # Finally, the scheduler was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStopped)
+
+ # There should be no more events on the list
+ assert not received_events
class TestSyncScheduler:
- @pytest.mark.parametrize('anyio_backend', ['asyncio'])
- def test_schedule_job(self, caplog, anyio_backend, sync_store, portal):
- def listener(event):
- events.append(event)
- if isinstance(event, JobSuccessful):
- scheduler.stop()
-
- caplog.set_level(logging.DEBUG)
- events = []
- with SyncScheduler(sync_store, portal=portal) as scheduler:
- scheduler.worker.subscribe(listener)
- scheduler.add_schedule(dummy_sync_job, DateTrigger(datetime.now(timezone.utc)))
- scheduler.wait_until_stopped()
-
- assert len(events) == 2
- assert isinstance(events[1], JobSuccessful)
- assert events[1].return_value == 'returnvalue'
+ def test_schedule_job(self):
+ def listener(received_event: Event) -> None:
+ received_events.append(received_event)
+ if len(received_events) == 4:
+ event.set()
+
+ received_events: List[Event] = []
+ event = threading.Event()
+ scheduler = Scheduler(start_worker=False)
+ scheduler.subscribe(listener)
+ trigger = DateTrigger(datetime.now(timezone.utc))
+ with scheduler:
+ scheduler.add_schedule(dummy_sync_job, trigger, id='foo')
+ event.wait(3)
+
+ # The scheduler was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStarted)
+
+ # Then a schedule was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleAdded)
+ assert received_event.schedule_id == 'foo'
+ # assert received_event.task_id == 'task_id'
+
+ # Then that schedule was processed and a job was added for it
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.schedule_id == 'foo'
+ assert received_event.task_id == 'test_schedulers:dummy_sync_job'
+
+ # Then the schedule was removed since the trigger had been exhausted
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, ScheduleRemoved)
+ assert received_event.schedule_id == 'foo'
+
+ # Finally, the scheduler was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, SchedulerStopped)
+
+ # There should be no more events on the list
+ assert not received_events
diff --git a/tests/test_workers.py b/tests/test_workers.py
index 18d24f6..9f098bc 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -1,12 +1,17 @@
import threading
-from datetime import datetime
+from datetime import datetime, timezone
+from typing import Callable, List
+import anyio
import pytest
-from anyio import Event, fail_after
+from anyio import fail_after
from apscheduler.abc import Job
-from apscheduler.events import JobDeadlineMissed, JobFailed, JobSuccessful, JobUpdated
+from apscheduler.datastores.sync.memory import MemoryDataStore
+from apscheduler.events import (
+ Event, JobAdded, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted, WorkerStarted,
+ WorkerStopped)
from apscheduler.workers.async_ import AsyncWorker
-from apscheduler.workers.sync import SyncWorker
+from apscheduler.workers.sync import Worker
pytestmark = pytest.mark.anyio
@@ -32,120 +37,199 @@ def fail_func():
class TestAsyncWorker:
@pytest.mark.parametrize('target_func', [sync_func, async_func], ids=['sync', 'async'])
@pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail'])
- @pytest.mark.parametrize('anyio_backend', ['asyncio'])
- async def test_run_job_nonscheduled_success(self, target_func, fail, store):
- async def listener(worker_event):
- worker_events.append(worker_event)
- if len(worker_events) == 2:
+ async def test_run_job_nonscheduled_success(self, target_func: Callable, fail: bool) -> None:
+ def listener(received_event: Event):
+ received_events.append(received_event)
+ if len(received_events) == 4:
event.set()
- worker_events = []
- event = Event()
- job = Job('task_id', func=target_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
- async with AsyncWorker(store) as worker:
- worker.subscribe(listener)
- await store.add_job(job)
- with fail_after(2):
+ received_events: List[Event] = []
+ event = anyio.Event()
+ data_store = MemoryDataStore()
+ worker = AsyncWorker(data_store)
+ worker.subscribe(listener)
+ async with worker:
+ job = Job('task_id', func=target_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
+ await worker.data_store.add_job(job)
+ with fail_after(3):
await event.wait()
- assert len(worker_events) == 2
-
- assert isinstance(worker_events[0], JobUpdated)
- assert worker_events[0].job_id == job.id
- assert worker_events[0].task_id == 'task_id'
- assert worker_events[0].schedule_id is None
-
- assert worker_events[1].job_id == job.id
- assert worker_events[1].task_id == 'task_id'
- assert worker_events[1].schedule_id is None
+ # The worker was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStarted)
+
+ # Then a job was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id is None
+
+ # Then the job was started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobStarted)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id is None
+
+ received_event = received_events.pop(0)
if fail:
- assert isinstance(worker_events[1], JobFailed)
- assert type(worker_events[1].exception) is Exception
- assert isinstance(worker_events[1].traceback, str)
+ # Then the job failed
+ assert isinstance(received_event, JobFailed)
+ assert isinstance(received_event.exception, str)
+ assert isinstance(received_event.traceback, str)
else:
- assert isinstance(worker_events[1], JobSuccessful)
- assert worker_events[1].return_value == ((1, 2), {'x': 'foo'})
-
- @pytest.mark.parametrize('anyio_backend', ['asyncio'])
- async def test_run_deadline_missed(self, store):
- async def listener(worker_event):
- worker_events.append(worker_event)
- event.set()
-
- scheduled_start_time = datetime(2020, 9, 14)
- worker_events = []
- event = Event()
- job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo',
- scheduled_fire_time=scheduled_start_time,
- start_deadline=datetime(2020, 9, 14, 1))
- async with AsyncWorker(store) as worker:
- worker.subscribe(listener)
- await store.add_job(job)
- with fail_after(5):
+ # Then the job finished successfully
+ assert isinstance(received_event, JobCompleted)
+ assert received_event.return_value == ((1, 2), {'x': 'foo'})
+
+ # Finally, the worker was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStopped)
+
+ # There should be no more events on the list
+ assert not received_events
+
+ async def test_run_deadline_missed(self) -> None:
+ def listener(received_event: Event):
+ received_events.append(received_event)
+ if len(received_events) == 3:
+ event.set()
+
+ scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
+ received_events: List[Event] = []
+ event = anyio.Event()
+ data_store = MemoryDataStore()
+ worker = AsyncWorker(data_store)
+ worker.subscribe(listener)
+ async with worker:
+ job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo',
+ scheduled_fire_time=scheduled_start_time,
+ start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc))
+ await worker.data_store.add_job(job)
+ with fail_after(3):
await event.wait()
- assert len(worker_events) == 1
- assert isinstance(worker_events[0], JobDeadlineMissed)
- assert worker_events[0].job_id == job.id
- assert worker_events[0].task_id == 'task_id'
- assert worker_events[0].schedule_id == 'foo'
- assert worker_events[0].scheduled_fire_time == scheduled_start_time
+ # The worker was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStarted)
+
+ # Then a job was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id == 'foo'
+
+ # Then the deadline was missed
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobDeadlineMissed)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id == 'foo'
+
+ # Finally, the worker was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStopped)
+
+ # There should be no more events on the list
+ assert not received_events
class TestSyncWorker:
- @pytest.mark.parametrize('target_func', [sync_func, async_func], ids=['sync', 'async'])
@pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail'])
- def test_run_job_nonscheduled(self, anyio_backend, target_func, fail, sync_store, portal):
- def listener(worker_event):
- print('received event:', worker_event)
- worker_events.append(worker_event)
- if len(worker_events) == 2:
+ def test_run_job_nonscheduled(self, fail: bool) -> None:
+ def listener(received_event: Event):
+ received_events.append(received_event)
+ if len(received_events) == 4:
event.set()
- worker_events = []
+ received_events: List[Event] = []
event = threading.Event()
- job = Job('task_id', func=target_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
- with SyncWorker(sync_store, portal=portal) as worker:
- worker.subscribe(listener)
- portal.call(sync_store.add_job, job)
- event.wait(2)
-
- assert len(worker_events) == 2
-
- assert isinstance(worker_events[0], JobUpdated)
- assert worker_events[0].job_id == job.id
- assert worker_events[0].task_id == 'task_id'
- assert worker_events[0].schedule_id is None
-
- assert worker_events[1].job_id == job.id
- assert worker_events[1].task_id == 'task_id'
- assert worker_events[1].schedule_id is None
+ data_store = MemoryDataStore()
+ worker = Worker(data_store)
+ worker.subscribe(listener)
+ with worker:
+ job = Job('task_id', func=sync_func, args=(1, 2), kwargs={'x': 'foo', 'fail': fail})
+ worker.data_store.add_job(job)
+ event.wait(5)
+
+ # The worker was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStarted)
+
+ # Then a job was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id is None
+
+ # Then the job was started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobStarted)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id is None
+
+ received_event = received_events.pop(0)
if fail:
- assert isinstance(worker_events[1], JobFailed)
- assert type(worker_events[1].exception) is Exception
- assert isinstance(worker_events[1].traceback, str)
+ # Then the job failed
+ assert isinstance(received_event, JobFailed)
+ assert isinstance(received_event.exception, str)
+ assert isinstance(received_event.traceback, str)
else:
- assert isinstance(worker_events[1], JobSuccessful)
- assert worker_events[1].return_value == ((1, 2), {'x': 'foo'})
+ # Then the job finished successfully
+ assert isinstance(received_event, JobCompleted)
+ assert received_event.return_value == ((1, 2), {'x': 'foo'})
+
+ # Finally, the worker was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStopped)
- def test_run_deadline_missed(self, anyio_backend, sync_store, portal):
- def listener(worker_event):
- worker_events.append(worker_event)
- event.set()
+ # There should be no more events on the list
+ assert not received_events
+
+ def test_run_deadline_missed(self) -> None:
+ def listener(worker_event: Event):
+ received_events.append(worker_event)
+ if len(received_events) == 3:
+ event.set()
- scheduled_start_time = datetime(2020, 9, 14)
- worker_events = []
+ scheduled_start_time = datetime(2020, 9, 14, tzinfo=timezone.utc)
+ received_events: List[Event] = []
event = threading.Event()
- job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo',
- scheduled_fire_time=scheduled_start_time,
- start_deadline=datetime(2020, 9, 14, 1))
- with SyncWorker(sync_store, portal=portal) as worker:
- worker.subscribe(listener)
- portal.call(sync_store.add_job, job)
+ data_store = MemoryDataStore()
+ worker = Worker(data_store)
+ worker.subscribe(listener)
+ with worker:
+ job = Job('task_id', fail_func, args=(), kwargs={}, schedule_id='foo',
+ scheduled_fire_time=scheduled_start_time,
+ start_deadline=datetime(2020, 9, 14, 1, tzinfo=timezone.utc))
+ worker.data_store.add_job(job)
event.wait(5)
- assert len(worker_events) == 1
- assert isinstance(worker_events[0], JobDeadlineMissed)
- assert worker_events[0].job_id == job.id
- assert worker_events[0].task_id == 'task_id'
- assert worker_events[0].schedule_id == 'foo'
+ # The worker was first started
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStarted)
+
+ # Then a job was added
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobAdded)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id == 'foo'
+
+ # Then the deadline was missed
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, JobDeadlineMissed)
+ assert received_event.job_id == job.id
+ assert received_event.task_id == 'task_id'
+ assert received_event.schedule_id == 'foo'
+
+ # Finally, the worker was stopped
+ received_event = received_events.pop(0)
+ assert isinstance(received_event, WorkerStopped)
+
+ # There should be no more events on the list
+ assert not received_events