diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2020-09-20 14:30:46 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2020-09-20 17:00:42 +0300 |
commit | 95169d277da6501b181a956791e7ea0171fbae64 (patch) | |
tree | f9ea601ff84506ff8d44799e6cdcfa723346aed9 /tests | |
parent | 6f6b36d83369cfb95b6b0071caf82c89818ef020 (diff) | |
download | apscheduler-95169d277da6501b181a956791e7ea0171fbae64.tar.gz |
Added the first usable scheduler, worker and datastore implementations
Diffstat (limited to 'tests')
-rw-r--r-- | tests/backends/test_memory.py | 138 | ||||
-rw-r--r-- | tests/conftest.py | 1 | ||||
-rw-r--r-- | tests/test_executors.py | 239 | ||||
-rw-r--r-- | tests/test_job.py | 254 | ||||
-rw-r--r-- | tests/test_jobstores.py | 387 | ||||
-rw-r--r-- | tests/test_schedulers.py | 1026 | ||||
-rw-r--r-- | tests/test_util.py | 4 | ||||
-rw-r--r-- | tests/triggers/test_calendarinterval.py | 3 | ||||
-rw-r--r-- | tests/triggers/test_combining.py | 1 | ||||
-rw-r--r-- | tests/triggers/test_cron.py | 1 | ||||
-rw-r--r-- | tests/triggers/test_interval.py | 1 | ||||
-rw-r--r-- | tests/workers/test_local.py | 101 |
12 files changed, 241 insertions, 1915 deletions
diff --git a/tests/backends/test_memory.py b/tests/backends/test_memory.py new file mode 100644 index 0000000..c8679f4 --- /dev/null +++ b/tests/backends/test_memory.py @@ -0,0 +1,138 @@ +from datetime import datetime, timedelta, timezone + +import pytest +from apscheduler.abc import Schedule +from apscheduler.datastores.memory import MemoryScheduleStore +from apscheduler.events import SchedulesAdded, SchedulesRemoved, SchedulesUpdated +from apscheduler.triggers.date import DateTrigger + +pytestmark = pytest.mark.anyio + + +@pytest.fixture +async def store(): + async with MemoryScheduleStore() as store_: + yield store_ + + +@pytest.fixture +async def events(store): + events = [] + await store.subscribe(events.append) + return events + + +@pytest.fixture +def schedules(): + trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc)) + schedule1 = Schedule(id='s1', task_id='bogus', trigger=trigger, args=(), kwargs={}, + coalesce=False, misfire_grace_time=None, tags=frozenset()) + schedule1.next_fire_time = trigger.next() + + trigger = DateTrigger(datetime(2020, 9, 14, tzinfo=timezone.utc)) + schedule2 = Schedule(id='s2', task_id='bogus', trigger=trigger, args=(), kwargs={}, + coalesce=False, misfire_grace_time=None, tags=frozenset()) + schedule2.next_fire_time = trigger.next() + + trigger = DateTrigger(datetime(2020, 9, 15, tzinfo=timezone.utc)) + schedule3 = Schedule(id='s3', task_id='bogus', trigger=trigger, args=(), kwargs={}, + coalesce=False, misfire_grace_time=None, tags=frozenset()) + return [schedule1, schedule2, schedule3] + + +async def test_add_schedules(store, schedules, events): + assert await store.get_next_fire_time() is None + await store.add_or_replace_schedules(schedules) + assert await store.get_next_fire_time() == datetime(2020, 9, 13, tzinfo=timezone.utc) + + assert await store.get_schedules() == schedules + assert await store.get_schedules({'s1', 's2', 's3'}) == schedules + assert await store.get_schedules({'s1'}) == [schedules[0]] + assert await store.get_schedules({'s2'}) == [schedules[1]] + assert await store.get_schedules({'s3'}) == [schedules[2]] + + assert len(events) == 1 + assert isinstance(events[0], SchedulesAdded) + assert events[0].schedule_ids == {'s1', 's2', 's3'} + assert events[0].earliest_next_fire_time == datetime(2020, 9, 13, tzinfo=timezone.utc) + + +async def test_replace_schedules(store, schedules, events): + await store.add_or_replace_schedules(schedules) + events.clear() + + next_fire_time = schedules[2].trigger.next() + schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(), kwargs={}, + coalesce=False, misfire_grace_time=None, tags=frozenset()) + schedule.next_fire_time = next_fire_time + await store.add_or_replace_schedules([schedule]) + + schedules = await store.get_schedules([schedule.id]) + assert schedules[0].task_id == 'foo' + assert schedules[0].next_fire_time == next_fire_time + assert schedules[0].args == () + assert schedules[0].kwargs == {} + assert not schedules[0].coalesce + assert schedules[0].misfire_grace_time is None + assert schedules[0].tags == frozenset() + + assert len(events) == 1 + assert isinstance(events[0], SchedulesUpdated) + assert events[0].schedule_ids == {'s3'} + assert events[0].earliest_next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) + + +async def test_update_schedules(store, schedules, events): + await store.add_or_replace_schedules(schedules) + events.clear() + + next_fire_time = datetime(2020, 9, 12, tzinfo=timezone.utc) + updated_ids = await store.update_schedules({ + 's2': {'task_id': 'foo', 'next_fire_time': next_fire_time, 'args': (1,), + 'kwargs': {'a': 'x'}, 'coalesce': True, 'misfire_grace_time': timedelta(seconds=5), + 'tags': frozenset(['a', 'b'])}, + 'nonexistent': {} + }) + assert updated_ids == {'s2'} + + schedules = await store.get_schedules({'s2'}) + assert len(schedules) == 1 + assert schedules[0].task_id == 'foo' + assert schedules[0].args == (1,) + assert schedules[0].kwargs == {'a': 'x'} + assert schedules[0].coalesce + assert schedules[0].next_fire_time == next_fire_time + assert schedules[0].misfire_grace_time == timedelta(seconds=5) + assert schedules[0].tags == frozenset(['a', 'b']) + + assert len(events) == 1 + assert isinstance(events[0], SchedulesUpdated) + assert events[0].schedule_ids == {'s2'} + assert events[0].earliest_next_fire_time == next_fire_time + + +@pytest.mark.parametrize('ids', [None, {'s1', 's2'}], ids=['all', 'by_id']) +async def test_remove_schedules(store, schedules, events, ids): + await store.add_or_replace_schedules(schedules) + events.clear() + + await store.remove_schedules(ids) + assert len(events) == 1 + assert isinstance(events[0], SchedulesRemoved) + if ids: + assert events[0].schedule_ids == {'s1', 's2'} + assert await store.get_schedules() == [schedules[2]] + else: + assert events[0].schedule_ids == {'s1', 's2', 's3'} + assert await store.get_schedules() == [] + + +async def test_acquire_due_schedules(store, schedules, events): + await store.add_or_replace_schedules(schedules) + events.clear() + + now = datetime(2020, 9, 14, tzinfo=timezone.utc) + async with store.acquire_due_schedules('dummy-id', now) as schedules: + assert len(schedules) == 2 + assert schedules[0].id == 's1' + assert schedules[1].id == 's2' diff --git a/tests/conftest.py b/tests/conftest.py index bd37e44..05f3f4b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,6 @@ from unittest.mock import Mock import pytest import pytz - from apscheduler.serializers.cbor import CBORSerializer from apscheduler.serializers.json import JSONSerializer from apscheduler.serializers.pickle import PickleSerializer diff --git a/tests/test_executors.py b/tests/test_executors.py deleted file mode 100644 index a1d525f..0000000 --- a/tests/test_executors.py +++ /dev/null @@ -1,239 +0,0 @@ -import gc -import time -from asyncio import CancelledError -from datetime import datetime -from threading import Event -from types import TracebackType -from unittest.mock import Mock, MagicMock, patch - -import pytest -from pytz import UTC, utc - -from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED -from apscheduler.executors.asyncio import AsyncIOExecutor -from apscheduler.executors.base import MaxInstancesReachedError, run_job -from apscheduler.executors.tornado import TornadoExecutor -from apscheduler.job import Job -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.schedulers.base import BaseScheduler -from apscheduler.schedulers.tornado import TornadoScheduler - - -@pytest.fixture -def mock_scheduler(timezone): - scheduler_ = Mock(BaseScheduler, timezone=timezone) - scheduler_._create_lock = MagicMock() - return scheduler_ - - -@pytest.fixture(params=['threadpool', 'processpool']) -def executor(request, mock_scheduler): - if request.param == 'threadpool': - from apscheduler.executors.pool import ThreadPoolExecutor - executor_ = ThreadPoolExecutor() - else: - from apscheduler.executors.pool import ProcessPoolExecutor - executor_ = ProcessPoolExecutor() - - executor_.start(mock_scheduler, 'dummy') - yield executor_ - executor_.shutdown() - - -def wait_event(): - time.sleep(0.2) - return 'test' - - -def failure(): - raise Exception('test failure') - - -def success(): - return 5 - - -def test_max_instances(mock_scheduler, executor, create_job, freeze_time): - """Tests that the maximum instance limit on a job is respected.""" - events = [] - mock_scheduler._dispatch_event = lambda event: events.append(event) - job = create_job(func=wait_event, max_instances=2, next_run_time=None) - executor.submit_job(job, [freeze_time.current]) - executor.submit_job(job, [freeze_time.current]) - - pytest.raises(MaxInstancesReachedError, executor.submit_job, job, [freeze_time.current]) - executor.shutdown() - assert len(events) == 2 - assert events[0].retval == 'test' - assert events[1].retval == 'test' - - -@pytest.mark.parametrize('event_code,func', [ - (EVENT_JOB_EXECUTED, success), - (EVENT_JOB_MISSED, failure), - (EVENT_JOB_ERROR, failure) -], ids=['executed', 'missed', 'error']) -def test_submit_job(mock_scheduler, executor, create_job, freeze_time, timezone, event_code, func): - """ - Tests that an EVENT_JOB_EXECUTED event is delivered to the scheduler if the job was - successfully executed. - - """ - mock_scheduler._dispatch_event = MagicMock() - job = create_job(func=func, id='foo') - job._jobstore_alias = 'test_jobstore' - run_time = (timezone.localize(datetime(1970, 1, 1)) if event_code == EVENT_JOB_MISSED else - freeze_time.current) - executor.submit_job(job, [run_time]) - executor.shutdown() - - assert mock_scheduler._dispatch_event.call_count == 1 - event = mock_scheduler._dispatch_event.call_args[0][0] - assert event.code == event_code - assert event.job_id == 'foo' - assert event.jobstore == 'test_jobstore' - - if event_code == EVENT_JOB_EXECUTED: - assert event.retval == 5 - elif event_code == EVENT_JOB_ERROR: - assert str(event.exception) == 'test failure' - assert isinstance(event.traceback, str) - - -class FauxJob(object): - id = 'abc' - max_instances = 1 - _jobstore_alias = 'foo' - - -def dummy_run_job(job, jobstore_alias, run_times, logger_name): - raise Exception('dummy') - - -def test_run_job_error(monkeypatch, executor): - """Tests that _run_job_error is properly called if an exception is raised in run_job()""" - def run_job_error(job_id, exc, traceback): - assert job_id == 'abc' - exc_traceback[:] = [exc, traceback] - event.set() - - event = Event() - exc_traceback = [None, None] - monkeypatch.setattr('apscheduler.executors.base.run_job', dummy_run_job) - monkeypatch.setattr('apscheduler.executors.pool.run_job', dummy_run_job) - monkeypatch.setattr(executor, '_run_job_error', run_job_error) - executor.submit_job(FauxJob(), []) - - event.wait(5) - assert str(exc_traceback[0]) == "dummy" - if exc_traceback[1] is not None: - assert isinstance(exc_traceback[1], TracebackType) - - -def test_run_job_memory_leak(): - class FooBar(object): - pass - - def func(): - foo = FooBar() # noqa: F841 - raise Exception('dummy') - - fake_job = Mock(Job, func=func, args=(), kwargs={}, misfire_grace_time=1) - with patch('logging.getLogger'): - for _ in range(5): - run_job(fake_job, 'foo', [datetime.now(UTC)], __name__) - - foos = [x for x in gc.get_objects() if type(x) is FooBar] - assert len(foos) == 0 - - -@pytest.fixture -def asyncio_scheduler(event_loop): - scheduler = AsyncIOScheduler(event_loop=event_loop) - scheduler.start(paused=True) - yield scheduler - scheduler.shutdown(False) - - -@pytest.fixture -def asyncio_executor(asyncio_scheduler): - executor = AsyncIOExecutor() - executor.start(asyncio_scheduler, 'default') - yield executor - executor.shutdown() - - -@pytest.fixture -def tornado_scheduler(io_loop): - scheduler = TornadoScheduler(io_loop=io_loop) - scheduler.start(paused=True) - yield scheduler - scheduler.shutdown(False) - - -@pytest.fixture -def tornado_executor(tornado_scheduler): - executor = TornadoExecutor() - executor.start(tornado_scheduler, 'default') - yield executor - executor.shutdown() - - -async def waiter(sleep, exception): - await sleep(0.1) - if exception: - raise Exception('dummy error') - else: - return True - - -@pytest.mark.parametrize('exception', [False, True]) -@pytest.mark.asyncio -async def test_run_coroutine_job(asyncio_scheduler, asyncio_executor, exception): - from asyncio import Future, sleep - - future = Future() - job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception]) - asyncio_executor._run_job_success = lambda job_id, events: future.set_result(events) - asyncio_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc) - asyncio_executor.submit_job(job, [datetime.now(utc)]) - events = await future - assert len(events) == 1 - if exception: - assert str(events[0].exception) == 'dummy error' - else: - assert events[0].retval is True - - -@pytest.mark.parametrize('exception', [False, True]) -@pytest.mark.gen_test -async def test_run_coroutine_job_tornado(tornado_scheduler, tornado_executor, exception): - from tornado.concurrent import Future - from tornado.gen import sleep - - future = Future() - job = tornado_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception]) - tornado_executor._run_job_success = lambda job_id, events: future.set_result(events) - tornado_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc) - tornado_executor.submit_job(job, [datetime.now(utc)]) - events = await future - assert len(events) == 1 - if exception: - assert str(events[0].exception) == 'dummy error' - else: - assert events[0].retval is True - - -@pytest.mark.asyncio -async def test_asyncio_executor_shutdown(asyncio_scheduler, asyncio_executor): - """Test that the AsyncIO executor cancels its pending tasks on shutdown.""" - from asyncio import sleep - - job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, None]) - asyncio_executor.submit_job(job, [datetime.now(utc)]) - futures = asyncio_executor._pending_futures.copy() - assert len(futures) == 1 - - asyncio_executor.shutdown() - with pytest.raises(CancelledError): - await futures.pop() diff --git a/tests/test_job.py b/tests/test_job.py deleted file mode 100644 index d1ab355..0000000 --- a/tests/test_job.py +++ /dev/null @@ -1,254 +0,0 @@ -from datetime import datetime, timedelta -from functools import partial -from unittest.mock import MagicMock, patch - -import pytest - -from apscheduler.job import Job -from apscheduler.schedulers.base import BaseScheduler -from apscheduler.triggers.date import DateTrigger - - -def dummyfunc(): - pass - - -@pytest.fixture -def job(create_job): - return create_job(func=dummyfunc) - - -@pytest.mark.parametrize('job_id', ['testid', None]) -def test_constructor(job_id): - with patch('apscheduler.job.Job._modify') as _modify: - scheduler_mock = MagicMock(BaseScheduler) - job = Job(scheduler_mock, id=job_id) - assert job._scheduler is scheduler_mock - assert job._jobstore_alias is None - - modify_kwargs = _modify.call_args[1] - if job_id is None: - assert len(modify_kwargs['id']) == 32 - else: - assert modify_kwargs['id'] == job_id - - -def test_modify(job): - job.modify(bah=1, foo='x') - job._scheduler.modify_job.assert_called_once_with(job.id, None, bah=1, foo='x') - - -def test_reschedule(job): - job.reschedule('trigger', bah=1, foo='x') - job._scheduler.reschedule_job.assert_called_once_with(job.id, None, 'trigger', bah=1, foo='x') - - -def test_pause(job): - job.pause() - job._scheduler.pause_job.assert_called_once_with(job.id, None) - - -def test_resume(job): - job.resume() - job._scheduler.resume_job.assert_called_once_with(job.id, None) - - -def test_remove(job): - job.remove() - job._scheduler.remove_job.assert_called_once_with(job.id, None) - - -def test_pending(job): - """ - Tests that the "pending" property return True when _jobstore_alias is a string, ``False`` - otherwise. - - """ - assert job.pending - - job._jobstore_alias = 'test' - assert not job.pending - - -def test_get_run_times(create_job, timezone): - run_time = timezone.localize(datetime(2010, 12, 13, 0, 8)) - expected_times = [run_time + timedelta(seconds=1), run_time + timedelta(seconds=2)] - job = create_job(trigger='interval', - trigger_args={'seconds': 1, 'timezone': timezone, 'start_date': run_time}, - next_run_time=expected_times[0], func=dummyfunc) - - run_times = job._get_run_times(run_time) - assert run_times == [] - - run_times = job._get_run_times(expected_times[0]) - assert run_times == [expected_times[0]] - - run_times = job._get_run_times(expected_times[1]) - assert run_times == expected_times - - -def test_private_modify_bad_id(job): - """Tests that only strings are accepted for job IDs.""" - del job.id - exc = pytest.raises(TypeError, job._modify, id=3) - assert str(exc.value) == 'id must be a nonempty string' - - -def test_private_modify_id(job): - """Tests that the job ID can't be changed.""" - exc = pytest.raises(ValueError, job._modify, id='alternate') - assert str(exc.value) == 'The job ID may not be changed' - - -def test_private_modify_bad_func(job): - """Tests that given a func of something else than a callable or string raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, func=1) - assert str(exc.value) == 'func must be a callable or a textual reference to one' - - -def test_private_modify_func_ref(job): - """Tests that the target callable can be given as a textual reference.""" - job._modify(func='test_job:dummyfunc') - assert job.func is dummyfunc - assert job.func_ref == 'test_job:dummyfunc' - - -def test_private_modify_unreachable_func(job): - """Tests that func_ref remains None if no reference to the target callable can be found.""" - func = partial(dummyfunc) - job._modify(func=func) - assert job.func is func - assert job.func_ref is None - - -def test_private_modify_update_name(job): - """Tests that the name attribute defaults to the function name.""" - del job.name - job._modify(func=dummyfunc) - assert job.name == 'dummyfunc' - - -def test_private_modify_bad_args(job): - """ Tests that passing an argument list of the wrong type raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, args=1) - assert str(exc.value) == 'args must be a non-string iterable' - - -def test_private_modify_bad_kwargs(job): - """Tests that passing an argument list of the wrong type raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, kwargs=1) - assert str(exc.value) == 'kwargs must be a dict-like object' - - -@pytest.mark.parametrize('value', [1, ''], ids=['integer', 'empty string']) -def test_private_modify_bad_name(job, value): - """ - Tests that passing an empty name or a name of something else than a string raises a TypeError. - - """ - exc = pytest.raises(TypeError, job._modify, name=value) - assert str(exc.value) == 'name must be a nonempty string' - - -@pytest.mark.parametrize('value', ['foo', 0, -1], ids=['string', 'zero', 'negative']) -def test_private_modify_bad_misfire_grace_time(job, value): - """Tests that passing a misfire_grace_time of the wrong type raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, misfire_grace_time=value) - assert str(exc.value) == 'misfire_grace_time must be either None or a positive integer' - - -@pytest.mark.parametrize('value', [None, 'foo', 0, -1], ids=['None', 'string', 'zero', 'negative']) -def test_private_modify_bad_max_instances(job, value): - """Tests that passing a max_instances of the wrong type raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, max_instances=value) - assert str(exc.value) == 'max_instances must be a positive integer' - - -def test_private_modify_bad_trigger(job): - """Tests that passing a trigger of the wrong type raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, trigger='foo') - assert str(exc.value) == 'Expected a trigger instance, got str instead' - - -def test_private_modify_bad_executor(job): - """Tests that passing an executor of the wrong type raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, executor=1) - assert str(exc.value) == 'executor must be a string' - - -def test_private_modify_bad_next_run_time(job): - """Tests that passing a next_run_time of the wrong type raises a TypeError.""" - exc = pytest.raises(TypeError, job._modify, next_run_time=1) - assert str(exc.value) == 'Unsupported type for next_run_time: int' - - -def test_private_modify_bad_argument(job): - """Tests that passing an unmodifiable argument type raises an AttributeError.""" - exc = pytest.raises(AttributeError, job._modify, scheduler=1) - assert str(exc.value) == 'The following are not modifiable attributes of Job: scheduler' - - -def test_getstate(job): - state = job.__getstate__() - assert state == dict( - version=1, trigger=job.trigger, executor='default', func='test_job:dummyfunc', - name=b'n\xc3\xa4m\xc3\xa9'.decode('utf-8'), args=(), kwargs={}, - id=b't\xc3\xa9st\xc3\xafd'.decode('utf-8'), misfire_grace_time=1, coalesce=False, - max_instances=1, next_run_time=None) - - -def test_setstate(job, timezone): - trigger = DateTrigger('2010-12-14 13:05:00', timezone) - state = dict( - version=1, scheduler=MagicMock(), jobstore=MagicMock(), trigger=trigger, - executor='dummyexecutor', func='test_job:dummyfunc', name='testjob.dummyfunc', - args=[], kwargs={}, id='other_id', misfire_grace_time=2, coalesce=True, max_instances=2, - next_run_time=None) - job.__setstate__(state) - assert job.id == 'other_id' - assert job.func == dummyfunc - assert job.func_ref == 'test_job:dummyfunc' - assert job.trigger == trigger - assert job.executor == 'dummyexecutor' - assert job.args == [] - assert job.kwargs == {} - assert job.name == 'testjob.dummyfunc' - assert job.misfire_grace_time == 2 - assert job.coalesce is True - assert job.max_instances == 2 - assert job.next_run_time is None - - -def test_setstate_bad_version(job): - """Tests that __setstate__ rejects state of higher version that it was designed to handle.""" - exc = pytest.raises(ValueError, job.__setstate__, {'version': 9999}) - assert 'Job has version 9999, but only version' in str(exc.value) - - -def test_eq(create_job): - job = create_job(func=lambda: None, id='foo') - job2 = create_job(func=lambda: None, id='foo') - job3 = create_job(func=lambda: None, id='bar') - assert job == job2 - assert not job == job3 - assert not job == 'foo' - - -def test_repr(job): - assert repr(job) == "<Job (id='téstïd' name='nämé')>" - - -@pytest.mark.parametrize('status, expected_status', [ - ('scheduled', 'next run at: 2011-04-03 18:40:00 CEST'), - ('paused', 'paused'), - ('pending', 'pending') -], ids=['scheduled', 'paused', 'pending']) -def test_str(create_job, status, expected_status): - job = create_job(func=dummyfunc) - if status == 'scheduled': - job.next_run_time = job.trigger.run_date - elif status == 'pending': - del job.next_run_time - - expected = 'nämé (trigger: date[2011-04-03 18:40:00 CEST], {})'.format(expected_status) - assert str(job) == expected diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py deleted file mode 100644 index f913f37..0000000 --- a/tests/test_jobstores.py +++ /dev/null @@ -1,387 +0,0 @@ -from datetime import datetime - -import pytest - -from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.jobstores.base import JobLookupError, ConflictingIdError - - -def dummy_job(): - pass - - -def dummy_job2(): - pass - - -def dummy_job3(): - pass - - -class DummyClass: - def dummy_method(self, a, b): - return a + b - - @classmethod - def dummy_classmethod(cls, a, b): - return a + b - - -@pytest.fixture -def memjobstore(): - yield MemoryJobStore() - - -@pytest.fixture -def sqlalchemyjobstore(tmpdir): - db_path = tmpdir.join('apscheduler_unittest.sqlite') - sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy') - store = sqlalchemy.SQLAlchemyJobStore(url='sqlite:///%s' % db_path) - store.start(None, 'sqlalchemy') - yield store - store.shutdown() - db_path.remove() - - -@pytest.fixture -def rethinkdbjobstore(): - rethinkdb = pytest.importorskip('apscheduler.jobstores.rethinkdb') - store = rethinkdb.RethinkDBJobStore(database='apscheduler_unittest') - store.start(None, 'rethinkdb') - yield store - store.r.db_drop('apscheduler_unittest').run(store.conn) - store.shutdown() - - -@pytest.fixture -def mongodbjobstore(): - mongodb = pytest.importorskip('apscheduler.jobstores.mongodb') - store = mongodb.MongoDBJobStore(database='apscheduler_unittest') - store.start(None, 'mongodb') - yield store - store.client.drop_database(store.collection.database.name) - store.shutdown() - - -@pytest.fixture -def redisjobstore(): - redis = pytest.importorskip('apscheduler.jobstores.redis') - store = redis.RedisJobStore() - store.start(None, 'redis') - yield store - store.remove_all_jobs() - store.shutdown() - - -@pytest.fixture -def zookeeperjobstore(): - zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') - store = zookeeper.ZooKeeperJobStore(path='/apscheduler_unittest') - store.start(None, 'zookeeper') - yield store - store.remove_all_jobs() - store.shutdown() - - -@pytest.fixture(params=['memjobstore', 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', - 'rethinkdbjobstore', 'zookeeperjobstore'], - ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper']) -def jobstore(request): - return request.getfixturevalue(request.param) - - -@pytest.fixture(params=['sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', - 'rethinkdbjobstore', 'zookeeperjobstore'], - ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper']) -def persistent_jobstore(request): - return request.getfixturevalue(request.param) - - -@pytest.fixture -def create_add_job(timezone, create_job): - def create(jobstore, func=dummy_job, run_date=datetime(2999, 1, 1), id=None, paused=False, - **kwargs): - run_date = timezone.localize(run_date) - job = create_job(func=func, trigger='date', trigger_args={'run_date': run_date}, id=id, - **kwargs) - job.next_run_time = None if paused else job.trigger.get_next_fire_time(None, run_date) - if jobstore: - jobstore.add_job(job) - return job - - return create - - -def test_add_instance_method_job(jobstore, create_add_job): - instance = DummyClass() - initial_job = create_add_job(jobstore, instance.dummy_method, kwargs={'a': 1, 'b': 2}) - job = jobstore.lookup_job(initial_job.id) - assert job.func(*job.args, **job.kwargs) == 3 - - -def test_add_class_method_job(jobstore, create_add_job): - initial_job = create_add_job(jobstore, DummyClass.dummy_classmethod, kwargs={'a': 1, 'b': 2}) - job = jobstore.lookup_job(initial_job.id) - assert job.func(*job.args, **job.kwargs) == 3 - - -def test_lookup_job(jobstore, create_add_job): - initial_job = create_add_job(jobstore) - job = jobstore.lookup_job(initial_job.id) - assert job == initial_job - - -def test_lookup_nonexistent_job(jobstore): - assert jobstore.lookup_job('foo') is None - - -def test_get_all_jobs(jobstore, create_add_job): - job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) - job2 = create_add_job(jobstore, dummy_job2, datetime(2013, 8, 14)) - job3 = create_add_job(jobstore, dummy_job2, datetime(2013, 7, 11), paused=True) - jobs = jobstore.get_all_jobs() - assert jobs == [job2, job1, job3] - - -def test_get_pending_jobs(jobstore, create_add_job, timezone): - create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) - job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26)) - job3 = create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14)) - create_add_job(jobstore, dummy_job3, datetime(2013, 7, 11), paused=True) - jobs = jobstore.get_due_jobs(timezone.localize(datetime(2014, 2, 27))) - assert jobs == [job3, job2] - - jobs = jobstore.get_due_jobs(timezone.localize(datetime(2013, 8, 13))) - assert jobs == [] - - -def test_get_pending_jobs_subsecond_difference(jobstore, create_add_job, timezone): - job1 = create_add_job(jobstore, dummy_job, datetime(2014, 7, 7, 0, 0, 0, 401)) - job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 7, 7, 0, 0, 0, 402)) - job3 = create_add_job(jobstore, dummy_job3, datetime(2014, 7, 7, 0, 0, 0, 400)) - jobs = jobstore.get_due_jobs(timezone.localize(datetime(2014, 7, 7, 1))) - assert jobs == [job3, job1, job2] - - -def test_get_next_run_time(jobstore, create_add_job, timezone): - create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) - create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26)) - create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14)) - create_add_job(jobstore, dummy_job3, datetime(2013, 7, 11), paused=True) - assert jobstore.get_next_run_time() == timezone.localize(datetime(2013, 8, 14)) - - -def test_add_job_conflicting_id(jobstore, create_add_job): - create_add_job(jobstore, dummy_job, datetime(2016, 5, 3), id='blah') - pytest.raises(ConflictingIdError, create_add_job, jobstore, dummy_job2, datetime(2014, 2, 26), - id='blah') - - -def test_update_job(jobstore, create_add_job, timezone): - job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) - job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26)) - replacement = create_add_job(None, dummy_job, datetime(2016, 5, 4), id=job1.id, - max_instances=6) - assert replacement.max_instances == 6 - jobstore.update_job(replacement) - - jobs = jobstore.get_all_jobs() - assert len(jobs) == 2 - assert jobs[0].id == job2.id - assert jobs[1].id == job1.id - assert jobs[1].next_run_time == timezone.localize(datetime(2016, 5, 4)) - assert jobs[1].max_instances == 6 - - -@pytest.mark.parametrize('next_run_time', [datetime(2013, 8, 13), None], ids=['earlier', 'null']) -def test_update_job_next_runtime(jobstore, create_add_job, next_run_time, timezone): - job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) - create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26)) - job3 = create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14)) - job1.next_run_time = timezone.localize(next_run_time) if next_run_time else None - jobstore.update_job(job1) - - if next_run_time: - assert jobstore.get_next_run_time() == job1.next_run_time - else: - assert jobstore.get_next_run_time() == job3.next_run_time - - -@pytest.mark.parametrize('next_run_time', [datetime(2013, 8, 13), None], ids=['earlier', 'null']) -@pytest.mark.parametrize('index', [0, 1, 2], ids=['first', 'middle', 'last']) -def test_update_job_clear_next_runtime(jobstore, create_add_job, next_run_time, timezone, index): - """ - Tests that update_job() maintains the proper ordering of the jobs, - even when their next run times are initially the same. - - """ - jobs = [create_add_job(jobstore, dummy_job, datetime(2014, 2, 26), 'job%d' % i) for - i in range(3)] - jobs[index].next_run_time = timezone.localize(next_run_time) if next_run_time else None - jobstore.update_job(jobs[index]) - due_date = timezone.localize(datetime(2014, 2, 27)) - due_jobs = jobstore.get_due_jobs(due_date) - - assert len(due_jobs) == (3 if next_run_time else 2) - due_job_ids = [job.id for job in due_jobs] - if next_run_time: - if index == 0: - assert due_job_ids == ['job0', 'job1', 'job2'] - elif index == 1: - assert due_job_ids == ['job1', 'job0', 'job2'] - else: - assert due_job_ids == ['job2', 'job0', 'job1'] - else: - if index == 0: - assert due_job_ids == ['job1', 'job2'] - elif index == 1: - assert due_job_ids == ['job0', 'job2'] - else: - assert due_job_ids == ['job0', 'job1'] - - -def test_update_job_nonexistent_job(jobstore, create_add_job): - job = create_add_job(None, dummy_job, datetime(2016, 5, 3)) - pytest.raises(JobLookupError, jobstore.update_job, job) - - -def test_one_job_fails_to_load(persistent_jobstore, create_add_job, monkeypatch, timezone): - job1 = create_add_job(persistent_jobstore, dummy_job, datetime(2016, 5, 3)) - job2 = create_add_job(persistent_jobstore, dummy_job2, datetime(2014, 2, 26)) - create_add_job(persistent_jobstore, dummy_job3, datetime(2013, 8, 14)) - - # Make the dummy_job2 function disappear - monkeypatch.delitem(globals(), 'dummy_job3') - - jobs = persistent_jobstore.get_all_jobs() - assert jobs == [job2, job1] - - assert persistent_jobstore.get_next_run_time() == timezone.localize(datetime(2014, 2, 26)) - - -def test_remove_job(jobstore, create_add_job): - job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) - job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26)) - - jobstore.remove_job(job1.id) - jobs = jobstore.get_all_jobs() - assert jobs == [job2] - - jobstore.remove_job(job2.id) - jobs = jobstore.get_all_jobs() - assert jobs == [] - - -def test_remove_nonexistent_job(jobstore): - pytest.raises(JobLookupError, jobstore.remove_job, 'blah') - - -def test_remove_all_jobs(jobstore, create_add_job): - create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) - create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26)) - - jobstore.remove_all_jobs() - jobs = jobstore.get_all_jobs() - assert jobs == [] - - -def test_repr_memjobstore(memjobstore): - assert repr(memjobstore) == '<MemoryJobStore>' - - -def test_repr_sqlalchemyjobstore(sqlalchemyjobstore): - assert repr(sqlalchemyjobstore).startswith('<SQLAlchemyJobStore (url=') - - -def test_repr_mongodbjobstore(mongodbjobstore): - assert repr(mongodbjobstore).startswith("<MongoDBJobStore (client=MongoClient(") - - -def test_repr_redisjobstore(redisjobstore): - assert repr(redisjobstore) == '<RedisJobStore>' - - -def test_repr_zookeeperjobstore(zookeeperjobstore): - class_sig = "<ZooKeeperJobStore (client=<kazoo.client.KazooClient" - assert repr(zookeeperjobstore).startswith(class_sig) - - -def test_memstore_close(memjobstore, create_add_job): - create_add_job(memjobstore, dummy_job, datetime(2016, 5, 3)) - memjobstore.shutdown() - assert not memjobstore.get_all_jobs() - - -def test_sqlalchemy_engine_ref(): - global sqla_engine - sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy') - sqla_engine = sqlalchemy.create_engine('sqlite:///') - try: - sqlalchemy.SQLAlchemyJobStore(engine='%s:sqla_engine' % __name__) - finally: - sqla_engine.dispose() - del sqla_engine - - -def test_sqlalchemy_missing_engine(): - sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy') - exc = pytest.raises(ValueError, sqlalchemy.SQLAlchemyJobStore) - assert 'Need either' in str(exc.value) - - -def test_mongodb_client_ref(): - global mongodb_client - mongodb = pytest.importorskip('apscheduler.jobstores.mongodb') - mongodb_client = mongodb.MongoClient() - try: - mongodb.MongoDBJobStore(client='%s:mongodb_client' % __name__) - finally: - mongodb_client.close() - del mongodb_client - - -def test_zookeeper_client_ref(): - global zookeeper_client - zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') - zookeeper_client = zookeeper.KazooClient() - try: - zookeeperjobstore = zookeeper.ZooKeeperJobStore(client='%s:zookeeper_client' % __name__) - zookeeperjobstore.start(None, 'zookeeper') - zookeeperjobstore.shutdown() - assert zookeeper_client.connected is True - finally: - zookeeper_client.stop() - zookeeper_client.close() - del zookeeper_client - - -def test_zookeeper_client_keep_open(): - global zookeeper_client - zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') - zookeeper_client = zookeeper.KazooClient() - try: - zookeeperjobstore = zookeeper.ZooKeeperJobStore(client='%s:zookeeper_client' % __name__, - close_connection_on_exit=True) - zookeeperjobstore.start(None, 'zookeeper') - zookeeperjobstore.shutdown() - assert zookeeper_client.connected is False - finally: - del zookeeper_client - - -def test_mongodb_null_database(): - mongodb = pytest.importorskip('apscheduler.jobstores.mongodb') - exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, database='') - assert '"database"' in str(exc.value) - - -def test_mongodb_null_collection(): - mongodb = pytest.importorskip('apscheduler.jobstores.mongodb') - exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, collection='') - assert '"collection"' in str(exc.value) - - -def test_zookeeper_null_path(): - zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') - exc = pytest.raises(ValueError, zookeeper.ZooKeeperJobStore, path='') - assert '"path"' in str(exc.value) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py deleted file mode 100644 index 31f49de..0000000 --- a/tests/test_schedulers.py +++ /dev/null @@ -1,1026 +0,0 @@ -import logging -from datetime import datetime, timedelta -from io import StringIO -from queue import Queue -from threading import Thread -from unittest.mock import MagicMock, patch - -import pytest -from pytz import utc - -from apscheduler.events import ( - EVENT_SCHEDULER_STARTED, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED, - EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_ALL_JOBS_REMOVED, EVENT_EXECUTOR_ADDED, - EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, EVENT_JOB_ADDED, - EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_PAUSED, - EVENT_SCHEDULER_RESUMED, SchedulerEvent) -from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError -from apscheduler.executors.debug import DebugExecutor -from apscheduler.job import Job -from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError -from apscheduler.jobstores.memory import MemoryJobStore -from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError -from apscheduler.schedulers.base import BaseScheduler, STATE_RUNNING, STATE_STOPPED -from apscheduler.triggers.base import BaseTrigger -from apscheduler.util import undefined - - -class DummyScheduler(BaseScheduler): - def __init__(self, *args, **kwargs): - super(DummyScheduler, self).__init__(*args, **kwargs) - self.wakeup = MagicMock() - - def shutdown(self, wait=True): - super(DummyScheduler, self).shutdown(wait) - - def wakeup(self): - pass - - -class DummyTrigger(BaseTrigger): - def __init__(self, **args): - self.args = args - - def get_next_fire_time(self, previous_fire_time, now): - pass - - -class DummyExecutor(BaseExecutor): - def __init__(self, **args): - super(DummyExecutor, self).__init__() - self.args = args - self.start = MagicMock() - self.shutdown = MagicMock() - self.submit_job = MagicMock() - - def _do_submit_job(self, job, run_times): - pass - - -class DummyJobStore(BaseJobStore): - def __init__(self, **args): - super(DummyJobStore, self).__init__() - self.args = args - self.start = MagicMock() - self.shutdown = MagicMock() - - def get_due_jobs(self, now): - pass - - def lookup_job(self, job_id): - pass - - def remove_job(self, job_id): - pass - - def remove_all_jobs(self): - pass - - def get_next_run_time(self): - pass - - def get_all_jobs(self): - pass - - def add_job(self, job): - pass - - def update_job(self, job): - pass - - -class TestBaseScheduler(object): - @pytest.fixture - def scheduler(self, timezone): - return DummyScheduler() - - @pytest.fixture - def scheduler_events(self, request, scheduler): - events = [] - mask = getattr(request, 'param', EVENT_ALL ^ EVENT_SCHEDULER_STARTED) - scheduler.add_listener(events.append, mask) - return events - - def test_constructor(self): - with patch('%s.DummyScheduler.configure' % __name__) as configure: - gconfig = {'apscheduler.foo': 'bar', 'apscheduler.x': 'y'} - options = {'bar': 'baz', 'xyz': 123} - DummyScheduler(gconfig, **options) - - configure.assert_called_once_with(gconfig, **options) - - @pytest.mark.parametrize('gconfig', [ - { - 'apscheduler.timezone': 'UTC', - 'apscheduler.job_defaults.misfire_grace_time': '5', - 'apscheduler.job_defaults.coalesce': 'false', - 'apscheduler.job_defaults.max_instances': '9', - 'apscheduler.executors.default.class': '%s:DummyExecutor' % __name__, - 'apscheduler.executors.default.arg1': '3', - 'apscheduler.executors.default.arg2': 'a', - 'apscheduler.executors.alter.class': '%s:DummyExecutor' % __name__, - 'apscheduler.executors.alter.arg': 'true', - 'apscheduler.jobstores.default.class': '%s:DummyJobStore' % __name__, - 'apscheduler.jobstores.default.arg1': '3', - 'apscheduler.jobstores.default.arg2': 'a', - 'apscheduler.jobstores.bar.class': '%s:DummyJobStore' % __name__, - 'apscheduler.jobstores.bar.arg': 'false', - }, - { - 'apscheduler.timezone': 'UTC', - 'apscheduler.job_defaults': { - 'misfire_grace_time': '5', - 'coalesce': 'false', - 'max_instances': '9', - }, - 'apscheduler.executors': { - 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'}, - 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'} - }, - 'apscheduler.jobstores': { - 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'}, - 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'} - } - } - ], ids=['ini-style', 'yaml-style']) - def test_configure(self, scheduler, gconfig): - scheduler._configure = MagicMock() - scheduler.configure(gconfig, timezone='Other timezone') - - scheduler._configure.assert_called_once_with({ - 'timezone': 'Other timezone', - 'job_defaults': { - 'misfire_grace_time': '5', - 'coalesce': 'false', - 'max_instances': '9', - }, - 'executors': { - 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'}, - 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'} - }, - 'jobstores': { - 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'}, - 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'} - } - }) - - @pytest.mark.parametrize('method', [ - BaseScheduler.configure, - BaseScheduler.start - ]) - def test_scheduler_already_running(self, method, scheduler): - """ - Test that SchedulerAlreadyRunningError is raised when certain methods are called before - the scheduler has been started. - - """ - scheduler.start(paused=True) - pytest.raises(SchedulerAlreadyRunningError, method, scheduler) - - @pytest.mark.parametrize('method', [ - BaseScheduler.pause, - BaseScheduler.resume, - BaseScheduler.shutdown - ], ids=['pause', 'resume', 'shutdown']) - def test_scheduler_not_running(self, scheduler, method): - """ - Test that the SchedulerNotRunningError is raised when certain methods are called before - the scheduler has been started. - - """ - pytest.raises(SchedulerNotRunningError, method, scheduler) - - def test_start(self, scheduler, create_job): - scheduler._executors = {'exec1': MagicMock(BaseExecutor), 'exec2': MagicMock(BaseExecutor)} - scheduler._jobstores = {'store1': MagicMock(BaseJobStore), - 'store2': MagicMock(BaseJobStore)} - job = create_job(func=lambda: None) - scheduler._pending_jobs = [(job, 'store1', False)] - scheduler._real_add_job = MagicMock() - scheduler._dispatch_event = MagicMock() - scheduler.start() - - scheduler._executors['exec1'].start.assert_called_once_with(scheduler, 'exec1') - scheduler._executors['exec2'].start.assert_called_once_with(scheduler, 'exec2') - scheduler._jobstores['store1'].start.assert_called_once_with(scheduler, 'store1') - scheduler._jobstores['store2'].start.assert_called_once_with(scheduler, 'store2') - assert len(scheduler._executors) == 3 - assert len(scheduler._jobstores) == 3 - assert 'default' in scheduler._executors - assert 'default' in scheduler._jobstores - - scheduler._real_add_job.assert_called_once_with(job, 'store1', False) - assert scheduler._pending_jobs == [] - - assert scheduler._dispatch_event.call_count == 3 - event = scheduler._dispatch_event.call_args_list[0][0][0] - assert event.code == EVENT_EXECUTOR_ADDED - assert event.alias == 'default' - event = scheduler._dispatch_event.call_args_list[1][0][0] - assert event.code == EVENT_JOBSTORE_ADDED - assert event.alias == 'default' - event = scheduler._dispatch_event.call_args_list[2][0][0] - assert event.code == EVENT_SCHEDULER_STARTED - - assert scheduler.state == STATE_RUNNING - - @pytest.mark.parametrize('wait', [True, False], ids=['wait', 'nowait']) - def test_shutdown(self, scheduler, scheduler_events, wait): - executor = DummyExecutor() - jobstore = DummyJobStore() - scheduler.add_executor(executor) - scheduler.add_jobstore(jobstore) - scheduler.start(paused=True) - del scheduler_events[:] - scheduler.shutdown(wait) - - assert scheduler.state == STATE_STOPPED - assert len(scheduler_events) == 1 - assert scheduler_events[0].code == EVENT_SCHEDULER_SHUTDOWN - - executor.shutdown.assert_called_once_with(wait) - jobstore.shutdown.assert_called_once_with() - - def test_pause_resume(self, scheduler, scheduler_events): - scheduler.start() - del scheduler_events[:] - scheduler.wakeup.reset_mock() - - scheduler.pause() - - assert len(scheduler_events) == 1 - assert scheduler_events[0].code == EVENT_SCHEDULER_PAUSED - assert not scheduler.wakeup.called - - scheduler.resume() - assert len(scheduler_events) == 2 - assert scheduler_events[1].code == EVENT_SCHEDULER_RESUMED - assert scheduler.wakeup.called - - @pytest.mark.parametrize('start_scheduler', [True, False]) - def test_running(self, scheduler, start_scheduler): - if start_scheduler: - scheduler.start() - - assert scheduler.running is start_scheduler - - @pytest.mark.parametrize('start_scheduler', [True, False]) - def test_add_remove_executor(self, scheduler, scheduler_events, start_scheduler): - if start_scheduler: - scheduler.start(paused=True) - - del scheduler_events[:] - executor = DummyExecutor() - scheduler.add_executor(executor, 'exec1') - - assert len(scheduler_events) == 1 - assert scheduler_events[0].code == EVENT_EXECUTOR_ADDED - assert scheduler_events[0].alias == 'exec1' - if start_scheduler: - executor.start.assert_called_once_with(scheduler, 'exec1') - else: - assert not executor.start.called - - scheduler.remove_executor('exec1') - assert len(scheduler_events) == 2 - assert scheduler_events[1].code == EVENT_EXECUTOR_REMOVED - assert scheduler_events[1].alias == 'exec1' - assert executor.shutdown.called - - def test_add_executor_already_exists(self, scheduler): - executor = DummyExecutor() - scheduler.add_executor(executor) - exc = pytest.raises(ValueError, scheduler.add_executor, executor) - assert str(exc.value) == 'This scheduler already has an executor by the alias of "default"' - - def test_remove_executor_nonexistent(self, scheduler): - pytest.raises(KeyError, scheduler.remove_executor, 'foo') - - @pytest.mark.parametrize('start_scheduler', [True, False]) - def test_add_jobstore(self, scheduler, scheduler_events, start_scheduler): - """ - Test that the proper event is dispatched when a job store is added and the scheduler's - wake() method is called if the scheduler is running. - - """ - if start_scheduler: - scheduler.start() - - del scheduler_events[:] - jobstore = DummyJobStore() - scheduler.add_jobstore(jobstore, 'store1') - - assert len(scheduler_events) == 1 - assert scheduler_events[0].code == EVENT_JOBSTORE_ADDED - assert scheduler_events[0].alias == 'store1' - - if start_scheduler: - assert scheduler.wakeup.called - jobstore.start.assert_called_once_with(scheduler, 'store1') - else: - assert not jobstore.start.called - - def test_add_jobstore_already_exists(self, scheduler): - """ - Test that ValueError is raised when a job store is added with an alias that already exists. - - """ - jobstore = MemoryJobStore() - scheduler.add_jobstore(jobstore) - exc = pytest.raises(ValueError, scheduler.add_jobstore, jobstore) - assert str(exc.value) == 'This scheduler already has a job store by the alias of "default"' - - def test_remove_jobstore(self, scheduler, scheduler_events): - scheduler.add_jobstore(MemoryJobStore(), 'foo') - scheduler.remove_jobstore('foo') - - assert len(scheduler_events) == 2 - assert scheduler_events[1].code == EVENT_JOBSTORE_REMOVED - assert scheduler_events[1].alias == 'foo' - - def test_remove_jobstore_nonexistent(self, scheduler): - pytest.raises(KeyError, scheduler.remove_jobstore, 'foo') - - def test_add_remove_listener(self, scheduler): - """Test that event dispatch works but removed listeners aren't called.""" - events = [] - scheduler.add_listener(events.append, EVENT_EXECUTOR_ADDED) - scheduler.add_executor(DummyExecutor(), 'exec1') - scheduler.remove_listener(events.append) - scheduler.add_executor(DummyExecutor(), 'exec2') - assert len(events) == 1 - - def test_add_job_return_value(self, scheduler, timezone): - """Test that when a job is added to a stopped scheduler, a Job instance is returned.""" - job = scheduler.add_job(lambda x, y: None, 'date', [1], {'y': 2}, 'my-id', 'dummy', - next_run_time=datetime(2014, 5, 23, 10), - run_date='2014-06-01 08:41:00') - - assert isinstance(job, Job) - assert job.id == 'my-id' - assert not hasattr(job, 'misfire_grace_time') - assert not hasattr(job, 'coalesce') - assert not hasattr(job, 'max_instances') - assert job.next_run_time.tzinfo.zone == timezone.zone - - def test_add_job_pending(self, scheduler, scheduler_events): - """ - Test that when a job is added to a stopped scheduler, it is not added to a job store until - the scheduler is started and that the event is dispatched when that happens. - - """ - scheduler.configure(job_defaults={ - 'misfire_grace_time': 3, 'coalesce': False, 'max_instances': 6 - }) - job = scheduler.add_job(lambda: None, 'interval', hours=1) - assert not scheduler_events - - scheduler.start(paused=True) - - assert len(scheduler_events) == 3 - assert scheduler_events[2].code == EVENT_JOB_ADDED - assert scheduler_events[2].job_id is job.id - - # Check that the undefined values were replaced with scheduler's job defaults - assert job.misfire_grace_time == 3 - assert not job.coalesce - assert job.max_instances == 6 - - def test_add_job_id_conflict(self, scheduler): - """ - Test that if a job is added with an already existing id, ConflictingIdError is raised. - - """ - scheduler.start(paused=True) - scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1) - pytest.raises(ConflictingIdError, scheduler.add_job, lambda: None, 'interval', - id='testjob', seconds=1) - - def test_add_job_replace(self, scheduler): - """Test that with replace_existing=True, a new job replaces another with the same id.""" - scheduler.start(paused=True) - scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1) - scheduler.add_job(lambda: None, 'cron', id='testjob', name='replacement', - replace_existing=True) - jobs = scheduler.get_jobs() - assert len(jobs) == 1 - assert jobs[0].name == 'replacement' - - def test_scheduled_job(self, scheduler): - def func(x, y): - pass - - scheduler.add_job = MagicMock() - decorator = scheduler.scheduled_job('date', [1], {'y': 2}, 'my-id', - 'dummy', run_date='2014-06-01 08:41:00') - decorator(func) - - scheduler.add_job.assert_called_once_with( - func, 'date', [1], {'y': 2}, 'my-id', 'dummy', undefined, undefined, undefined, - undefined, 'default', 'default', True, run_date='2014-06-01 08:41:00') - - @pytest.mark.parametrize('pending', [True, False], ids=['pending job', 'scheduled job']) - def test_modify_job(self, scheduler, pending, timezone): - job = MagicMock() - scheduler._dispatch_event = MagicMock() - scheduler._lookup_job = MagicMock(return_value=(job, None if pending else 'default')) - if not pending: - jobstore = MagicMock() - scheduler._lookup_jobstore = lambda alias: jobstore if alias == 'default' else None - scheduler.modify_job('blah', misfire_grace_time=5, max_instances=2, - next_run_time=datetime(2014, 10, 17)) - - job._modify.assert_called_once_with(misfire_grace_time=5, max_instances=2, - next_run_time=datetime(2014, 10, 17)) - if not pending: - jobstore.update_job.assert_called_once_with(job) - - assert scheduler._dispatch_event.call_count == 1 - event = scheduler._dispatch_event.call_args[0][0] - assert event.code == EVENT_JOB_MODIFIED - assert event.jobstore == (None if pending else 'default') - - def test_reschedule_job(self, scheduler): - scheduler.modify_job = MagicMock() - trigger = MagicMock(get_next_fire_time=lambda previous, now: 1) - scheduler._create_trigger = MagicMock(return_value=trigger) - scheduler.reschedule_job('my-id', 'jobstore', 'date', run_date='2014-06-01 08:41:00') - - assert scheduler.modify_job.call_count == 1 - assert scheduler.modify_job.call_args[0] == ('my-id', 'jobstore') - assert scheduler.modify_job.call_args[1] == {'trigger': trigger, 'next_run_time': 1} - - def test_pause_job(self, scheduler): - scheduler.modify_job = MagicMock() - scheduler.pause_job('job_id', 'jobstore') - - scheduler.modify_job.assert_called_once_with('job_id', 'jobstore', next_run_time=None) - - @pytest.mark.parametrize('dead_job', [True, False], ids=['dead job', 'live job']) - def test_resume_job(self, scheduler, freeze_time, dead_job): - next_fire_time = None if dead_job else freeze_time.current + timedelta(seconds=1) - trigger = MagicMock(BaseTrigger, get_next_fire_time=lambda prev, now: next_fire_time) - returned_job = MagicMock(Job, id='foo', trigger=trigger) - scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar')) - scheduler.modify_job = MagicMock() - scheduler.remove_job = MagicMock() - scheduler.resume_job('foo') - - if dead_job: - scheduler.remove_job.assert_called_once_with('foo', 'bar') - else: - scheduler.modify_job.assert_called_once_with('foo', 'bar', - next_run_time=next_fire_time) - - @pytest.mark.parametrize('scheduler_started', [True, False], ids=['running', 'stopped']) - @pytest.mark.parametrize('jobstore', [None, 'other'], - ids=['all jobstores', 'specific jobstore']) - def test_get_jobs(self, scheduler, scheduler_started, jobstore): - scheduler.add_jobstore(MemoryJobStore(), 'other') - scheduler.add_job(lambda: None, 'interval', seconds=1, id='job1') - scheduler.add_job(lambda: None, 'interval', seconds=1, id='job2', jobstore='other') - if scheduler_started: - scheduler.start(paused=True) - - expected_job_ids = {'job2'} - if jobstore is None: - expected_job_ids.add('job1') - - job_ids = {job.id for job in scheduler.get_jobs(jobstore)} - assert job_ids == expected_job_ids - - @pytest.mark.parametrize('jobstore', [None, 'bar'], ids=['any jobstore', 'specific jobstore']) - def test_get_job(self, scheduler, jobstore): - returned_job = object() - scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar')) - job = scheduler.get_job('foo', jobstore) - - assert job is returned_job - - def test_get_job_nonexistent_job(self, scheduler): - scheduler._lookup_job = MagicMock(side_effect=JobLookupError('foo')) - assert scheduler.get_job('foo') is None - - def test_get_job_nonexistent_jobstore(self, scheduler): - assert scheduler.get_job('foo', 'bar') is None - - @pytest.mark.parametrize('start_scheduler', [True, False]) - @pytest.mark.parametrize('jobstore', [None, 'other'], - ids=['any jobstore', 'specific jobstore']) - def test_remove_job(self, scheduler, scheduler_events, start_scheduler, jobstore): - scheduler.add_jobstore(MemoryJobStore(), 'other') - scheduler.add_job(lambda: None, id='job1') - if start_scheduler: - scheduler.start(paused=True) - - del scheduler_events[:] - if jobstore: - pytest.raises(JobLookupError, scheduler.remove_job, 'job1', jobstore) - assert len(scheduler.get_jobs()) == 1 - assert len(scheduler_events) == 0 - else: - scheduler.remove_job('job1', jobstore) - assert len(scheduler.get_jobs()) == 0 - assert len(scheduler_events) == 1 - assert scheduler_events[0].code == EVENT_JOB_REMOVED - - def test_remove_nonexistent_job(self, scheduler): - pytest.raises(JobLookupError, scheduler.remove_job, 'foo') - - @pytest.mark.parametrize('start_scheduler', [True, False]) - @pytest.mark.parametrize('jobstore', [None, 'other'], ids=['all', 'single jobstore']) - def test_remove_all_jobs(self, scheduler, start_scheduler, scheduler_events, jobstore): - """ - Test that remove_all_jobs() removes all jobs from all attached job stores, plus any - pending jobs. - - """ - scheduler.add_jobstore(MemoryJobStore(), 'other') - scheduler.add_job(lambda: None, id='job1') - scheduler.add_job(lambda: None, id='job2') - scheduler.add_job(lambda: None, id='job3', jobstore='other') - if start_scheduler: - scheduler.start(paused=True) - - del scheduler_events[:] - scheduler.remove_all_jobs(jobstore) - jobs = scheduler.get_jobs() - - assert len(jobs) == (2 if jobstore else 0) - assert len(scheduler_events) == 1 - assert scheduler_events[0].code == EVENT_ALL_JOBS_REMOVED - assert scheduler_events[0].alias == jobstore - - @pytest.mark.parametrize('start_scheduler', [True, False]) - @pytest.mark.parametrize('jobstore', [None, 'other'], - ids=['all jobstores', 'specific jobstore']) - def test_print_jobs(self, scheduler, start_scheduler, jobstore): - scheduler.add_jobstore(MemoryJobStore(), 'other') - if start_scheduler: - scheduler.start(paused=True) - - scheduler.add_job(lambda: None, 'date', run_date='2099-09-09', id='job1', - name='test job 1') - scheduler.add_job(lambda: None, 'date', run_date='2099-08-08', id='job2', - name='test job 2', jobstore='other') - - outfile = StringIO() - scheduler.print_jobs(jobstore, outfile) - - if jobstore and not start_scheduler: - assert outfile.getvalue() == """\ -Pending jobs: - test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending) -""" - elif jobstore and start_scheduler: - assert outfile.getvalue() == """\ -Jobstore other: - test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET) -""" - elif not jobstore and not start_scheduler: - assert outfile.getvalue() == """\ -Pending jobs: - test job 1 (trigger: date[2099-09-09 00:00:00 CET], pending) - test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending) -""" - else: - assert outfile.getvalue() == """\ -Jobstore default: - test job 1 (trigger: date[2099-09-09 00:00:00 CET], next run at: 2099-09-09 00:00:00 CET) -Jobstore other: - test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET) -""" - - @pytest.mark.parametrize('config', [ - { - 'timezone': 'UTC', - 'job_defaults': { - 'misfire_grace_time': '5', - 'coalesce': 'false', - 'max_instances': '9', - }, - 'executors': { - 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'}, - 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'} - }, - 'jobstores': { - 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'}, - 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'} - } - }, - { - 'timezone': utc, - 'job_defaults': { - 'misfire_grace_time': 5, - 'coalesce': False, - 'max_instances': 9, - }, - 'executors': { - 'default': DummyExecutor(arg1='3', arg2='a'), - 'alter': DummyExecutor(arg='true') - }, - 'jobstores': { - 'default': DummyJobStore(arg1='3', arg2='a'), - 'bar': DummyJobStore(arg='false') - } - } - ], ids=['references', 'instances']) - def test_configure_private(self, scheduler, config): - scheduler._configure(config) - - assert scheduler.timezone is utc - assert scheduler._job_defaults == { - 'misfire_grace_time': 5, - 'coalesce': False, - 'max_instances': 9 - } - assert set(scheduler._executors.keys()) == {'default', 'alter'} - assert scheduler._executors['default'].args == {'arg1': '3', 'arg2': 'a'} - assert scheduler._executors['alter'].args == {'arg': 'true'} - assert set(scheduler._jobstores.keys()) == {'default', 'bar'} - assert scheduler._jobstores['default'].args == {'arg1': '3', 'arg2': 'a'} - assert scheduler._jobstores['bar'].args == {'arg': 'false'} - - def test_configure_private_invalid_executor(self, scheduler): - exc = pytest.raises(TypeError, scheduler._configure, {'executors': {'default': 6}}) - assert str(exc.value) == ("Expected executor instance or dict for executors['default'], " - "got int instead") - - def test_configure_private_invalid_jobstore(self, scheduler): - exc = pytest.raises(TypeError, scheduler._configure, {'jobstores': {'default': 6}}) - assert str(exc.value) == ("Expected job store instance or dict for jobstores['default'], " - "got int instead") - - def test_create_default_executor(self, scheduler): - executor = scheduler._create_default_executor() - assert isinstance(executor, BaseExecutor) - - def test_create_default_jobstore(self, scheduler): - store = scheduler._create_default_jobstore() - assert isinstance(store, BaseJobStore) - - def test_lookup_executor(self, scheduler): - executor = object() - scheduler._executors = {'executor': executor} - assert scheduler._lookup_executor('executor') is executor - - def test_lookup_executor_nonexistent(self, scheduler): - pytest.raises(KeyError, scheduler._lookup_executor, 'executor') - - def test_lookup_jobstore(self, scheduler): - store = object() - scheduler._jobstores = {'store': store} - assert scheduler._lookup_jobstore('store') is store - - def test_lookup_jobstore_nonexistent(self, scheduler): - pytest.raises(KeyError, scheduler._lookup_jobstore, 'store') - - def test_dispatch_event(self, scheduler): - event = SchedulerEvent(1) - scheduler._listeners = [(MagicMock(), 2), (MagicMock(side_effect=Exception), 1), - (MagicMock(), 1)] - scheduler._dispatch_event(event) - - assert not scheduler._listeners[0][0].called - scheduler._listeners[1][0].assert_called_once_with(event) - - @pytest.mark.parametrize('load_plugin', [True, False], ids=['load plugin', 'plugin loaded']) - def test_create_trigger(self, scheduler, load_plugin): - """Tests that creating a trigger with an already loaded plugin works.""" - - scheduler._trigger_plugins = {} - scheduler._trigger_classes = {} - if load_plugin: - scheduler._trigger_plugins['dummy'] = MagicMock( - load=MagicMock(return_value=DummyTrigger)) - else: - scheduler._trigger_classes['dummy'] = DummyTrigger - - result = scheduler._create_trigger('dummy', {'a': 1, 'b': 'x'}) - - assert isinstance(result, DummyTrigger) - assert result.args == {'a': 1, 'b': 'x', 'timezone': scheduler.timezone} - - def test_create_trigger_instance(self, scheduler): - """Tests that passing a trigger instance will return the instance as-is.""" - trigger_instance = DummyTrigger() - - assert scheduler._create_trigger(trigger_instance, {}) is trigger_instance - - def test_create_trigger_default_type(self, scheduler): - """Tests that passing None as the trigger will create a "date" trigger instance.""" - scheduler._trigger_classes = {'date': DummyTrigger} - result = scheduler._create_trigger(None, {'a': 1}) - - assert isinstance(result, DummyTrigger) - assert result.args == {'a': 1, 'timezone': scheduler.timezone} - - def test_create_trigger_bad_trigger_type(self, scheduler): - exc = pytest.raises(TypeError, scheduler._create_trigger, 1, {}) - assert str(exc.value) == 'Expected a trigger instance or string, got int instead' - - def test_create_trigger_bad_plugin_type(self, scheduler): - mock_plugin = MagicMock() - mock_plugin.load.configure_mock(return_value=object) - scheduler._trigger_classes = {} - scheduler._trigger_plugins = {'dummy': mock_plugin} - exc = pytest.raises(TypeError, scheduler._create_trigger, 'dummy', {}) - assert str(exc.value) == 'The trigger entry point does not point to a trigger class' - - def test_create_trigger_nonexisting_plugin(self, scheduler): - exc = pytest.raises(LookupError, scheduler._create_trigger, 'dummy', {}) - assert str(exc.value) == 'No trigger by the name "dummy" was found' - - def test_create_lock(self, scheduler): - lock = scheduler._create_lock() - assert hasattr(lock, '__enter__') - - def test_process_jobs_empty(self, scheduler): - assert scheduler._process_jobs() is None - - def test_job_submitted_event(self, scheduler, freeze_time): - events = [] - scheduler.add_job(lambda: None, run_date=freeze_time.get()) - scheduler.add_listener(events.append, EVENT_JOB_SUBMITTED) - scheduler.start() - scheduler._process_jobs() - - assert len(events) == 1 - assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] - - @pytest.mark.parametrize('scheduler_events', [EVENT_JOB_MAX_INSTANCES], - indirect=['scheduler_events']) - def test_job_max_instances_event(self, scheduler, scheduler_events, freeze_time): - class MaxedOutExecutor(DebugExecutor): - def submit_job(self, job, run_times): - raise MaxInstancesReachedError(job) - - executor = MaxedOutExecutor() - scheduler.add_executor(executor, 'maxed') - scheduler.add_job(lambda: None, run_date=freeze_time.get(), executor='maxed') - scheduler.start() - scheduler._process_jobs() - - assert len(scheduler_events) == 1 - assert scheduler_events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] - - -class TestProcessJobs(object): - @pytest.fixture - def job(self): - job = MagicMock(Job, id=999, executor='default') - job.trigger = MagicMock(get_next_fire_time=MagicMock(return_value=None)) - job. __str__ = lambda x: 'job 999' - return job - - @pytest.fixture - def scheduler(self): - scheduler = DummyScheduler() - scheduler.start() - return scheduler - - @pytest.fixture - def jobstore(self, scheduler, job): - jobstore = MagicMock(BaseJobStore, get_due_jobs=MagicMock(return_value=[job]), - get_next_run_time=MagicMock(return_value=None)) - scheduler._jobstores['default'] = jobstore - return jobstore - - @pytest.fixture - def executor(self, scheduler): - executor = MagicMock(BaseExecutor) - scheduler._executors['default'] = executor - return executor - - def test_nonexistent_executor(self, scheduler, jobstore, caplog): - """ - Test that an error is logged and the job is removed from its job store if its executor is - not found. - - """ - caplog.set_level(logging.ERROR) - scheduler.remove_executor('default') - assert scheduler._process_jobs() is None - jobstore.remove_job.assert_called_once_with(999) - assert len(caplog.records) == 1 - assert caplog.records[0].message == \ - 'Executor lookup ("default") failed for job "job 999" -- removing it from the job ' \ - 'store' - - def test_max_instances_reached(self, scheduler, job, jobstore, executor, caplog): - """Tests that a warning is logged when the maximum instances of a job is reached.""" - caplog.set_level(logging.WARNING) - executor.submit_job = MagicMock(side_effect=MaxInstancesReachedError(job)) - - assert scheduler._process_jobs() is None - assert len(caplog.records) == 1 - assert caplog.records[0].message == \ - 'Execution of job "job 999" skipped: maximum number of running instances reached (1)' - - def test_executor_error(self, scheduler, jobstore, executor, caplog): - """Tests that if any exception is raised in executor.submit(), it is logged.""" - caplog.set_level(logging.ERROR) - executor.submit_job = MagicMock(side_effect=Exception('test message')) - - assert scheduler._process_jobs() is None - assert len(caplog.records) == 1 - assert 'test message' in caplog.records[0].exc_text - assert 'Error submitting job "job 999" to executor "default"' in caplog.records[0].message - - def test_job_update(self, scheduler, job, jobstore, freeze_time): - """ - Tests that the job is updated in its job store with the next run time from the trigger. - - """ - next_run_time = freeze_time.current + timedelta(seconds=6) - job.trigger.get_next_fire_time = MagicMock(return_value=next_run_time) - assert scheduler._process_jobs() is None - job._modify.assert_called_once_with(next_run_time=next_run_time) - jobstore.update_job.assert_called_once_with(job) - - def test_wait_time(self, scheduler, freeze_time): - """ - Tests that the earliest next run time from all job stores is returned (ignoring Nones). - - """ - scheduler._jobstores = { - 'default': MagicMock(get_next_run_time=MagicMock( - return_value=freeze_time.current + timedelta(seconds=8))), - 'alter': MagicMock(get_next_run_time=MagicMock(return_value=None)), - 'another': MagicMock(get_next_run_time=MagicMock( - return_value=freeze_time.current + timedelta(seconds=5))), - 'more': MagicMock(get_next_run_time=MagicMock( - return_value=freeze_time.current + timedelta(seconds=6))), - } - - assert scheduler._process_jobs() == 5 - - -class SchedulerImplementationTestBase(object): - @pytest.fixture(autouse=True) - def executor(self, scheduler): - scheduler.add_executor(DebugExecutor()) - - @pytest.fixture - def start_scheduler(self, request, scheduler): - yield scheduler.start - if scheduler.running: - scheduler.shutdown() - - @pytest.fixture - def eventqueue(self, scheduler): - events = Queue() - scheduler.add_listener(events.put) - return events - - def wait_event(self, queue): - return queue.get(True, 1) - - def test_add_pending_job(self, scheduler, freeze_time, eventqueue, start_scheduler): - """Tests that pending jobs are added (and if due, executed) when the scheduler starts.""" - freeze_time.set_increment(timedelta(seconds=0.2)) - scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2], run_date=freeze_time.next()) - start_scheduler() - - assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED - assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED - assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED - event = self.wait_event(eventqueue) - assert event.code == EVENT_JOB_EXECUTED - assert event.retval == 3 - assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED - - def test_add_live_job(self, scheduler, freeze_time, eventqueue, start_scheduler): - """Tests that adding a job causes it to be executed after the specified delay.""" - freeze_time.set_increment(timedelta(seconds=0.2)) - start_scheduler() - assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED - assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED - - scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2], - run_date=freeze_time.next() + freeze_time.increment * 2) - assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED - event = self.wait_event(eventqueue) - assert event.code == EVENT_JOB_EXECUTED - assert event.retval == 3 - assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED - - def test_shutdown(self, scheduler, eventqueue, start_scheduler): - """Tests that shutting down the scheduler emits the proper event.""" - start_scheduler() - assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED - assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED - - scheduler.shutdown() - assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_SHUTDOWN - - -class TestBlockingScheduler(SchedulerImplementationTestBase): - @pytest.fixture - def scheduler(self): - from apscheduler.schedulers.blocking import BlockingScheduler - return BlockingScheduler() - - @pytest.fixture - def start_scheduler(self, request, scheduler): - thread = Thread(target=scheduler.start) - yield thread.start - - if scheduler.running: - scheduler.shutdown() - thread.join() - - -class TestBackgroundScheduler(SchedulerImplementationTestBase): - @pytest.fixture - def scheduler(self): - from apscheduler.schedulers.background import BackgroundScheduler - return BackgroundScheduler() - - -class TestAsyncIOScheduler(SchedulerImplementationTestBase): - @pytest.fixture - def event_loop(self): - asyncio = pytest.importorskip('apscheduler.schedulers.asyncio') - return asyncio.asyncio.new_event_loop() - - @pytest.fixture - def scheduler(self, event_loop): - asyncio = pytest.importorskip('apscheduler.schedulers.asyncio') - return asyncio.AsyncIOScheduler(event_loop=event_loop) - - @pytest.fixture - def start_scheduler(self, request, event_loop, scheduler): - event_loop.call_soon_threadsafe(scheduler.start) - thread = Thread(target=event_loop.run_forever) - yield thread.start - - if scheduler.running: - event_loop.call_soon_threadsafe(scheduler.shutdown) - event_loop.call_soon_threadsafe(event_loop.stop) - thread.join() - - -class TestGeventScheduler(SchedulerImplementationTestBase): - @pytest.fixture - def scheduler(self): - gevent = pytest.importorskip('apscheduler.schedulers.gevent') - return gevent.GeventScheduler() - - @pytest.fixture - def calc_event(self): - from gevent.event import Event - return Event() - - @pytest.fixture - def eventqueue(self, scheduler): - from gevent.queue import Queue - events = Queue() - scheduler.add_listener(events.put) - return events - - -class TestTornadoScheduler(SchedulerImplementationTestBase): - @pytest.fixture - def io_loop(self): - ioloop = pytest.importorskip('tornado.ioloop') - return ioloop.IOLoop() - - @pytest.fixture - def scheduler(self, io_loop): - tornado = pytest.importorskip('apscheduler.schedulers.tornado') - return tornado.TornadoScheduler(io_loop=io_loop) - - @pytest.fixture - def start_scheduler(self, request, io_loop, scheduler): - io_loop.add_callback(scheduler.start) - thread = Thread(target=io_loop.start) - yield thread.start - - if scheduler.running: - io_loop.add_callback(scheduler.shutdown) - io_loop.add_callback(io_loop.stop) - thread.join() - - -class TestTwistedScheduler(SchedulerImplementationTestBase): - @pytest.fixture - def reactor(self): - selectreactor = pytest.importorskip('twisted.internet.selectreactor') - return selectreactor.SelectReactor() - - @pytest.fixture - def scheduler(self, reactor): - twisted = pytest.importorskip('apscheduler.schedulers.twisted') - return twisted.TwistedScheduler(reactor=reactor) - - @pytest.fixture - def start_scheduler(self, request, reactor, scheduler): - reactor.callFromThread(scheduler.start) - thread = Thread(target=reactor.run, args=(False,)) - yield thread.start - - if scheduler.running: - reactor.callFromThread(scheduler.shutdown) - reactor.callFromThread(reactor.stop) - thread.join() diff --git a/tests/test_util.py b/tests/test_util.py index f80e17b..19ed988 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -5,10 +5,8 @@ from functools import partial from types import ModuleType import pytest -import pytz - from apscheduler.util import ( - datetime_ceil, get_callable_name, obj_to_ref, ref_to_obj, maybe_ref, check_callable_args) + check_callable_args, datetime_ceil, get_callable_name, maybe_ref, obj_to_ref, ref_to_obj) class DummyClass(object): diff --git a/tests/triggers/test_calendarinterval.py b/tests/triggers/test_calendarinterval.py index ac0d023..079d014 100644 --- a/tests/triggers/test_calendarinterval.py +++ b/tests/triggers/test_calendarinterval.py @@ -1,7 +1,6 @@ -from datetime import datetime, date +from datetime import date, datetime import pytest - from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger diff --git a/tests/triggers/test_combining.py b/tests/triggers/test_combining.py index b804102..73a5836 100644 --- a/tests/triggers/test_combining.py +++ b/tests/triggers/test_combining.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta import pytest - from apscheduler.exceptions import MaxIterationsReached from apscheduler.triggers.combining import AndTrigger, OrTrigger from apscheduler.triggers.date import DateTrigger diff --git a/tests/triggers/test_cron.py b/tests/triggers/test_cron.py index c331ef6..8f26aa3 100644 --- a/tests/triggers/test_cron.py +++ b/tests/triggers/test_cron.py @@ -2,7 +2,6 @@ from datetime import datetime import pytest import pytz - from apscheduler.triggers.cron import CronTrigger diff --git a/tests/triggers/test_interval.py b/tests/triggers/test_interval.py index 07ca35d..6761777 100644 --- a/tests/triggers/test_interval.py +++ b/tests/triggers/test_interval.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta import pytest - from apscheduler.triggers.interval import IntervalTrigger diff --git a/tests/workers/test_local.py b/tests/workers/test_local.py new file mode 100644 index 0000000..d72d559 --- /dev/null +++ b/tests/workers/test_local.py @@ -0,0 +1,101 @@ +from datetime import datetime + +import pytest +from anyio import fail_after, sleep +from apscheduler.abc import Job +from apscheduler.events import JobAdded, JobDeadlineMissed, JobFailed, JobSuccessful, JobUpdated +from apscheduler.workers.local import LocalExecutor + +pytestmark = pytest.mark.anyio + + +@pytest.mark.parametrize('sync', [True, False], ids=['sync', 'async']) +@pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail']) +async def test_run_job_nonscheduled_success(sync, fail): + def sync_func(*args, **kwargs): + nonlocal received_args, received_kwargs + received_args = args + received_kwargs = kwargs + if fail: + raise Exception('failing as requested') + else: + return 'success' + + async def async_func(*args, **kwargs): + nonlocal received_args, received_kwargs + received_args = args + received_kwargs = kwargs + if fail: + raise Exception('failing as requested') + else: + return 'success' + + received_args = received_kwargs = None + events = [] + async with LocalExecutor() as worker: + await worker.subscribe(events.append) + + job = Job('task_id', sync_func if sync else async_func, args=(1, 2), kwargs={'x': 'foo'}) + await worker.submit_job(job) + + async with fail_after(1): + while len(events) < 3: + await sleep(0) + + assert received_args == (1, 2) + assert received_kwargs == {'x': 'foo'} + + assert isinstance(events[0], JobAdded) + assert events[0].job_id == job.id + assert events[0].task_id == 'task_id' + assert events[0].schedule_id is None + assert events[0].scheduled_start_time is None + + assert isinstance(events[1], JobUpdated) + assert events[1].job_id == job.id + assert events[1].task_id == 'task_id' + assert events[1].schedule_id is None + assert events[1].scheduled_start_time is None + + assert events[2].job_id == job.id + assert events[2].task_id == 'task_id' + assert events[2].schedule_id is None + assert events[2].scheduled_start_time is None + if fail: + assert isinstance(events[2], JobFailed) + assert type(events[2].exception) is Exception + assert isinstance(events[2].formatted_traceback, str) + else: + assert isinstance(events[2], JobSuccessful) + assert events[2].return_value == 'success' + + +async def test_run_deadline_missed(): + def func(): + pytest.fail('This function should never be run') + + scheduled_start_time = datetime(2020, 9, 14) + events = [] + async with LocalExecutor() as worker: + await worker.subscribe(events.append) + + job = Job('task_id', func, args=(), kwargs={}, schedule_id='foo', + scheduled_start_time=scheduled_start_time, + start_deadline=datetime(2020, 9, 14, 1)) + await worker.submit_job(job) + + async with fail_after(1): + while len(events) < 2: + await sleep(0) + + assert isinstance(events[0], JobAdded) + assert events[0].job_id == job.id + assert events[0].task_id == 'task_id' + assert events[0].schedule_id == 'foo' + assert events[0].scheduled_start_time == scheduled_start_time + + assert isinstance(events[1], JobDeadlineMissed) + assert events[1].job_id == job.id + assert events[1].task_id == 'task_id' + assert events[1].schedule_id == 'foo' + assert events[1].scheduled_start_time == scheduled_start_time |