summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2020-09-20 14:30:46 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2020-09-20 17:00:42 +0300
commit95169d277da6501b181a956791e7ea0171fbae64 (patch)
treef9ea601ff84506ff8d44799e6cdcfa723346aed9 /tests
parent6f6b36d83369cfb95b6b0071caf82c89818ef020 (diff)
downloadapscheduler-95169d277da6501b181a956791e7ea0171fbae64.tar.gz
Added the first usable scheduler, worker and datastore implementations
Diffstat (limited to 'tests')
-rw-r--r--tests/backends/test_memory.py138
-rw-r--r--tests/conftest.py1
-rw-r--r--tests/test_executors.py239
-rw-r--r--tests/test_job.py254
-rw-r--r--tests/test_jobstores.py387
-rw-r--r--tests/test_schedulers.py1026
-rw-r--r--tests/test_util.py4
-rw-r--r--tests/triggers/test_calendarinterval.py3
-rw-r--r--tests/triggers/test_combining.py1
-rw-r--r--tests/triggers/test_cron.py1
-rw-r--r--tests/triggers/test_interval.py1
-rw-r--r--tests/workers/test_local.py101
12 files changed, 241 insertions, 1915 deletions
diff --git a/tests/backends/test_memory.py b/tests/backends/test_memory.py
new file mode 100644
index 0000000..c8679f4
--- /dev/null
+++ b/tests/backends/test_memory.py
@@ -0,0 +1,138 @@
+from datetime import datetime, timedelta, timezone
+
+import pytest
+from apscheduler.abc import Schedule
+from apscheduler.datastores.memory import MemoryScheduleStore
+from apscheduler.events import SchedulesAdded, SchedulesRemoved, SchedulesUpdated
+from apscheduler.triggers.date import DateTrigger
+
+pytestmark = pytest.mark.anyio
+
+
+@pytest.fixture
+async def store():
+ async with MemoryScheduleStore() as store_:
+ yield store_
+
+
+@pytest.fixture
+async def events(store):
+ events = []
+ await store.subscribe(events.append)
+ return events
+
+
+@pytest.fixture
+def schedules():
+ trigger = DateTrigger(datetime(2020, 9, 13, tzinfo=timezone.utc))
+ schedule1 = Schedule(id='s1', task_id='bogus', trigger=trigger, args=(), kwargs={},
+ coalesce=False, misfire_grace_time=None, tags=frozenset())
+ schedule1.next_fire_time = trigger.next()
+
+ trigger = DateTrigger(datetime(2020, 9, 14, tzinfo=timezone.utc))
+ schedule2 = Schedule(id='s2', task_id='bogus', trigger=trigger, args=(), kwargs={},
+ coalesce=False, misfire_grace_time=None, tags=frozenset())
+ schedule2.next_fire_time = trigger.next()
+
+ trigger = DateTrigger(datetime(2020, 9, 15, tzinfo=timezone.utc))
+ schedule3 = Schedule(id='s3', task_id='bogus', trigger=trigger, args=(), kwargs={},
+ coalesce=False, misfire_grace_time=None, tags=frozenset())
+ return [schedule1, schedule2, schedule3]
+
+
+async def test_add_schedules(store, schedules, events):
+ assert await store.get_next_fire_time() is None
+ await store.add_or_replace_schedules(schedules)
+ assert await store.get_next_fire_time() == datetime(2020, 9, 13, tzinfo=timezone.utc)
+
+ assert await store.get_schedules() == schedules
+ assert await store.get_schedules({'s1', 's2', 's3'}) == schedules
+ assert await store.get_schedules({'s1'}) == [schedules[0]]
+ assert await store.get_schedules({'s2'}) == [schedules[1]]
+ assert await store.get_schedules({'s3'}) == [schedules[2]]
+
+ assert len(events) == 1
+ assert isinstance(events[0], SchedulesAdded)
+ assert events[0].schedule_ids == {'s1', 's2', 's3'}
+ assert events[0].earliest_next_fire_time == datetime(2020, 9, 13, tzinfo=timezone.utc)
+
+
+async def test_replace_schedules(store, schedules, events):
+ await store.add_or_replace_schedules(schedules)
+ events.clear()
+
+ next_fire_time = schedules[2].trigger.next()
+ schedule = Schedule(id='s3', task_id='foo', trigger=schedules[2].trigger, args=(), kwargs={},
+ coalesce=False, misfire_grace_time=None, tags=frozenset())
+ schedule.next_fire_time = next_fire_time
+ await store.add_or_replace_schedules([schedule])
+
+ schedules = await store.get_schedules([schedule.id])
+ assert schedules[0].task_id == 'foo'
+ assert schedules[0].next_fire_time == next_fire_time
+ assert schedules[0].args == ()
+ assert schedules[0].kwargs == {}
+ assert not schedules[0].coalesce
+ assert schedules[0].misfire_grace_time is None
+ assert schedules[0].tags == frozenset()
+
+ assert len(events) == 1
+ assert isinstance(events[0], SchedulesUpdated)
+ assert events[0].schedule_ids == {'s3'}
+ assert events[0].earliest_next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc)
+
+
+async def test_update_schedules(store, schedules, events):
+ await store.add_or_replace_schedules(schedules)
+ events.clear()
+
+ next_fire_time = datetime(2020, 9, 12, tzinfo=timezone.utc)
+ updated_ids = await store.update_schedules({
+ 's2': {'task_id': 'foo', 'next_fire_time': next_fire_time, 'args': (1,),
+ 'kwargs': {'a': 'x'}, 'coalesce': True, 'misfire_grace_time': timedelta(seconds=5),
+ 'tags': frozenset(['a', 'b'])},
+ 'nonexistent': {}
+ })
+ assert updated_ids == {'s2'}
+
+ schedules = await store.get_schedules({'s2'})
+ assert len(schedules) == 1
+ assert schedules[0].task_id == 'foo'
+ assert schedules[0].args == (1,)
+ assert schedules[0].kwargs == {'a': 'x'}
+ assert schedules[0].coalesce
+ assert schedules[0].next_fire_time == next_fire_time
+ assert schedules[0].misfire_grace_time == timedelta(seconds=5)
+ assert schedules[0].tags == frozenset(['a', 'b'])
+
+ assert len(events) == 1
+ assert isinstance(events[0], SchedulesUpdated)
+ assert events[0].schedule_ids == {'s2'}
+ assert events[0].earliest_next_fire_time == next_fire_time
+
+
+@pytest.mark.parametrize('ids', [None, {'s1', 's2'}], ids=['all', 'by_id'])
+async def test_remove_schedules(store, schedules, events, ids):
+ await store.add_or_replace_schedules(schedules)
+ events.clear()
+
+ await store.remove_schedules(ids)
+ assert len(events) == 1
+ assert isinstance(events[0], SchedulesRemoved)
+ if ids:
+ assert events[0].schedule_ids == {'s1', 's2'}
+ assert await store.get_schedules() == [schedules[2]]
+ else:
+ assert events[0].schedule_ids == {'s1', 's2', 's3'}
+ assert await store.get_schedules() == []
+
+
+async def test_acquire_due_schedules(store, schedules, events):
+ await store.add_or_replace_schedules(schedules)
+ events.clear()
+
+ now = datetime(2020, 9, 14, tzinfo=timezone.utc)
+ async with store.acquire_due_schedules('dummy-id', now) as schedules:
+ assert len(schedules) == 2
+ assert schedules[0].id == 's1'
+ assert schedules[1].id == 's2'
diff --git a/tests/conftest.py b/tests/conftest.py
index bd37e44..05f3f4b 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -3,7 +3,6 @@ from unittest.mock import Mock
import pytest
import pytz
-
from apscheduler.serializers.cbor import CBORSerializer
from apscheduler.serializers.json import JSONSerializer
from apscheduler.serializers.pickle import PickleSerializer
diff --git a/tests/test_executors.py b/tests/test_executors.py
deleted file mode 100644
index a1d525f..0000000
--- a/tests/test_executors.py
+++ /dev/null
@@ -1,239 +0,0 @@
-import gc
-import time
-from asyncio import CancelledError
-from datetime import datetime
-from threading import Event
-from types import TracebackType
-from unittest.mock import Mock, MagicMock, patch
-
-import pytest
-from pytz import UTC, utc
-
-from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_MISSED, EVENT_JOB_EXECUTED
-from apscheduler.executors.asyncio import AsyncIOExecutor
-from apscheduler.executors.base import MaxInstancesReachedError, run_job
-from apscheduler.executors.tornado import TornadoExecutor
-from apscheduler.job import Job
-from apscheduler.schedulers.asyncio import AsyncIOScheduler
-from apscheduler.schedulers.base import BaseScheduler
-from apscheduler.schedulers.tornado import TornadoScheduler
-
-
-@pytest.fixture
-def mock_scheduler(timezone):
- scheduler_ = Mock(BaseScheduler, timezone=timezone)
- scheduler_._create_lock = MagicMock()
- return scheduler_
-
-
-@pytest.fixture(params=['threadpool', 'processpool'])
-def executor(request, mock_scheduler):
- if request.param == 'threadpool':
- from apscheduler.executors.pool import ThreadPoolExecutor
- executor_ = ThreadPoolExecutor()
- else:
- from apscheduler.executors.pool import ProcessPoolExecutor
- executor_ = ProcessPoolExecutor()
-
- executor_.start(mock_scheduler, 'dummy')
- yield executor_
- executor_.shutdown()
-
-
-def wait_event():
- time.sleep(0.2)
- return 'test'
-
-
-def failure():
- raise Exception('test failure')
-
-
-def success():
- return 5
-
-
-def test_max_instances(mock_scheduler, executor, create_job, freeze_time):
- """Tests that the maximum instance limit on a job is respected."""
- events = []
- mock_scheduler._dispatch_event = lambda event: events.append(event)
- job = create_job(func=wait_event, max_instances=2, next_run_time=None)
- executor.submit_job(job, [freeze_time.current])
- executor.submit_job(job, [freeze_time.current])
-
- pytest.raises(MaxInstancesReachedError, executor.submit_job, job, [freeze_time.current])
- executor.shutdown()
- assert len(events) == 2
- assert events[0].retval == 'test'
- assert events[1].retval == 'test'
-
-
-@pytest.mark.parametrize('event_code,func', [
- (EVENT_JOB_EXECUTED, success),
- (EVENT_JOB_MISSED, failure),
- (EVENT_JOB_ERROR, failure)
-], ids=['executed', 'missed', 'error'])
-def test_submit_job(mock_scheduler, executor, create_job, freeze_time, timezone, event_code, func):
- """
- Tests that an EVENT_JOB_EXECUTED event is delivered to the scheduler if the job was
- successfully executed.
-
- """
- mock_scheduler._dispatch_event = MagicMock()
- job = create_job(func=func, id='foo')
- job._jobstore_alias = 'test_jobstore'
- run_time = (timezone.localize(datetime(1970, 1, 1)) if event_code == EVENT_JOB_MISSED else
- freeze_time.current)
- executor.submit_job(job, [run_time])
- executor.shutdown()
-
- assert mock_scheduler._dispatch_event.call_count == 1
- event = mock_scheduler._dispatch_event.call_args[0][0]
- assert event.code == event_code
- assert event.job_id == 'foo'
- assert event.jobstore == 'test_jobstore'
-
- if event_code == EVENT_JOB_EXECUTED:
- assert event.retval == 5
- elif event_code == EVENT_JOB_ERROR:
- assert str(event.exception) == 'test failure'
- assert isinstance(event.traceback, str)
-
-
-class FauxJob(object):
- id = 'abc'
- max_instances = 1
- _jobstore_alias = 'foo'
-
-
-def dummy_run_job(job, jobstore_alias, run_times, logger_name):
- raise Exception('dummy')
-
-
-def test_run_job_error(monkeypatch, executor):
- """Tests that _run_job_error is properly called if an exception is raised in run_job()"""
- def run_job_error(job_id, exc, traceback):
- assert job_id == 'abc'
- exc_traceback[:] = [exc, traceback]
- event.set()
-
- event = Event()
- exc_traceback = [None, None]
- monkeypatch.setattr('apscheduler.executors.base.run_job', dummy_run_job)
- monkeypatch.setattr('apscheduler.executors.pool.run_job', dummy_run_job)
- monkeypatch.setattr(executor, '_run_job_error', run_job_error)
- executor.submit_job(FauxJob(), [])
-
- event.wait(5)
- assert str(exc_traceback[0]) == "dummy"
- if exc_traceback[1] is not None:
- assert isinstance(exc_traceback[1], TracebackType)
-
-
-def test_run_job_memory_leak():
- class FooBar(object):
- pass
-
- def func():
- foo = FooBar() # noqa: F841
- raise Exception('dummy')
-
- fake_job = Mock(Job, func=func, args=(), kwargs={}, misfire_grace_time=1)
- with patch('logging.getLogger'):
- for _ in range(5):
- run_job(fake_job, 'foo', [datetime.now(UTC)], __name__)
-
- foos = [x for x in gc.get_objects() if type(x) is FooBar]
- assert len(foos) == 0
-
-
-@pytest.fixture
-def asyncio_scheduler(event_loop):
- scheduler = AsyncIOScheduler(event_loop=event_loop)
- scheduler.start(paused=True)
- yield scheduler
- scheduler.shutdown(False)
-
-
-@pytest.fixture
-def asyncio_executor(asyncio_scheduler):
- executor = AsyncIOExecutor()
- executor.start(asyncio_scheduler, 'default')
- yield executor
- executor.shutdown()
-
-
-@pytest.fixture
-def tornado_scheduler(io_loop):
- scheduler = TornadoScheduler(io_loop=io_loop)
- scheduler.start(paused=True)
- yield scheduler
- scheduler.shutdown(False)
-
-
-@pytest.fixture
-def tornado_executor(tornado_scheduler):
- executor = TornadoExecutor()
- executor.start(tornado_scheduler, 'default')
- yield executor
- executor.shutdown()
-
-
-async def waiter(sleep, exception):
- await sleep(0.1)
- if exception:
- raise Exception('dummy error')
- else:
- return True
-
-
-@pytest.mark.parametrize('exception', [False, True])
-@pytest.mark.asyncio
-async def test_run_coroutine_job(asyncio_scheduler, asyncio_executor, exception):
- from asyncio import Future, sleep
-
- future = Future()
- job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception])
- asyncio_executor._run_job_success = lambda job_id, events: future.set_result(events)
- asyncio_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc)
- asyncio_executor.submit_job(job, [datetime.now(utc)])
- events = await future
- assert len(events) == 1
- if exception:
- assert str(events[0].exception) == 'dummy error'
- else:
- assert events[0].retval is True
-
-
-@pytest.mark.parametrize('exception', [False, True])
-@pytest.mark.gen_test
-async def test_run_coroutine_job_tornado(tornado_scheduler, tornado_executor, exception):
- from tornado.concurrent import Future
- from tornado.gen import sleep
-
- future = Future()
- job = tornado_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, exception])
- tornado_executor._run_job_success = lambda job_id, events: future.set_result(events)
- tornado_executor._run_job_error = lambda job_id, exc, tb: future.set_exception(exc)
- tornado_executor.submit_job(job, [datetime.now(utc)])
- events = await future
- assert len(events) == 1
- if exception:
- assert str(events[0].exception) == 'dummy error'
- else:
- assert events[0].retval is True
-
-
-@pytest.mark.asyncio
-async def test_asyncio_executor_shutdown(asyncio_scheduler, asyncio_executor):
- """Test that the AsyncIO executor cancels its pending tasks on shutdown."""
- from asyncio import sleep
-
- job = asyncio_scheduler.add_job(waiter, 'interval', seconds=1, args=[sleep, None])
- asyncio_executor.submit_job(job, [datetime.now(utc)])
- futures = asyncio_executor._pending_futures.copy()
- assert len(futures) == 1
-
- asyncio_executor.shutdown()
- with pytest.raises(CancelledError):
- await futures.pop()
diff --git a/tests/test_job.py b/tests/test_job.py
deleted file mode 100644
index d1ab355..0000000
--- a/tests/test_job.py
+++ /dev/null
@@ -1,254 +0,0 @@
-from datetime import datetime, timedelta
-from functools import partial
-from unittest.mock import MagicMock, patch
-
-import pytest
-
-from apscheduler.job import Job
-from apscheduler.schedulers.base import BaseScheduler
-from apscheduler.triggers.date import DateTrigger
-
-
-def dummyfunc():
- pass
-
-
-@pytest.fixture
-def job(create_job):
- return create_job(func=dummyfunc)
-
-
-@pytest.mark.parametrize('job_id', ['testid', None])
-def test_constructor(job_id):
- with patch('apscheduler.job.Job._modify') as _modify:
- scheduler_mock = MagicMock(BaseScheduler)
- job = Job(scheduler_mock, id=job_id)
- assert job._scheduler is scheduler_mock
- assert job._jobstore_alias is None
-
- modify_kwargs = _modify.call_args[1]
- if job_id is None:
- assert len(modify_kwargs['id']) == 32
- else:
- assert modify_kwargs['id'] == job_id
-
-
-def test_modify(job):
- job.modify(bah=1, foo='x')
- job._scheduler.modify_job.assert_called_once_with(job.id, None, bah=1, foo='x')
-
-
-def test_reschedule(job):
- job.reschedule('trigger', bah=1, foo='x')
- job._scheduler.reschedule_job.assert_called_once_with(job.id, None, 'trigger', bah=1, foo='x')
-
-
-def test_pause(job):
- job.pause()
- job._scheduler.pause_job.assert_called_once_with(job.id, None)
-
-
-def test_resume(job):
- job.resume()
- job._scheduler.resume_job.assert_called_once_with(job.id, None)
-
-
-def test_remove(job):
- job.remove()
- job._scheduler.remove_job.assert_called_once_with(job.id, None)
-
-
-def test_pending(job):
- """
- Tests that the "pending" property return True when _jobstore_alias is a string, ``False``
- otherwise.
-
- """
- assert job.pending
-
- job._jobstore_alias = 'test'
- assert not job.pending
-
-
-def test_get_run_times(create_job, timezone):
- run_time = timezone.localize(datetime(2010, 12, 13, 0, 8))
- expected_times = [run_time + timedelta(seconds=1), run_time + timedelta(seconds=2)]
- job = create_job(trigger='interval',
- trigger_args={'seconds': 1, 'timezone': timezone, 'start_date': run_time},
- next_run_time=expected_times[0], func=dummyfunc)
-
- run_times = job._get_run_times(run_time)
- assert run_times == []
-
- run_times = job._get_run_times(expected_times[0])
- assert run_times == [expected_times[0]]
-
- run_times = job._get_run_times(expected_times[1])
- assert run_times == expected_times
-
-
-def test_private_modify_bad_id(job):
- """Tests that only strings are accepted for job IDs."""
- del job.id
- exc = pytest.raises(TypeError, job._modify, id=3)
- assert str(exc.value) == 'id must be a nonempty string'
-
-
-def test_private_modify_id(job):
- """Tests that the job ID can't be changed."""
- exc = pytest.raises(ValueError, job._modify, id='alternate')
- assert str(exc.value) == 'The job ID may not be changed'
-
-
-def test_private_modify_bad_func(job):
- """Tests that given a func of something else than a callable or string raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, func=1)
- assert str(exc.value) == 'func must be a callable or a textual reference to one'
-
-
-def test_private_modify_func_ref(job):
- """Tests that the target callable can be given as a textual reference."""
- job._modify(func='test_job:dummyfunc')
- assert job.func is dummyfunc
- assert job.func_ref == 'test_job:dummyfunc'
-
-
-def test_private_modify_unreachable_func(job):
- """Tests that func_ref remains None if no reference to the target callable can be found."""
- func = partial(dummyfunc)
- job._modify(func=func)
- assert job.func is func
- assert job.func_ref is None
-
-
-def test_private_modify_update_name(job):
- """Tests that the name attribute defaults to the function name."""
- del job.name
- job._modify(func=dummyfunc)
- assert job.name == 'dummyfunc'
-
-
-def test_private_modify_bad_args(job):
- """ Tests that passing an argument list of the wrong type raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, args=1)
- assert str(exc.value) == 'args must be a non-string iterable'
-
-
-def test_private_modify_bad_kwargs(job):
- """Tests that passing an argument list of the wrong type raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, kwargs=1)
- assert str(exc.value) == 'kwargs must be a dict-like object'
-
-
-@pytest.mark.parametrize('value', [1, ''], ids=['integer', 'empty string'])
-def test_private_modify_bad_name(job, value):
- """
- Tests that passing an empty name or a name of something else than a string raises a TypeError.
-
- """
- exc = pytest.raises(TypeError, job._modify, name=value)
- assert str(exc.value) == 'name must be a nonempty string'
-
-
-@pytest.mark.parametrize('value', ['foo', 0, -1], ids=['string', 'zero', 'negative'])
-def test_private_modify_bad_misfire_grace_time(job, value):
- """Tests that passing a misfire_grace_time of the wrong type raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, misfire_grace_time=value)
- assert str(exc.value) == 'misfire_grace_time must be either None or a positive integer'
-
-
-@pytest.mark.parametrize('value', [None, 'foo', 0, -1], ids=['None', 'string', 'zero', 'negative'])
-def test_private_modify_bad_max_instances(job, value):
- """Tests that passing a max_instances of the wrong type raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, max_instances=value)
- assert str(exc.value) == 'max_instances must be a positive integer'
-
-
-def test_private_modify_bad_trigger(job):
- """Tests that passing a trigger of the wrong type raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, trigger='foo')
- assert str(exc.value) == 'Expected a trigger instance, got str instead'
-
-
-def test_private_modify_bad_executor(job):
- """Tests that passing an executor of the wrong type raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, executor=1)
- assert str(exc.value) == 'executor must be a string'
-
-
-def test_private_modify_bad_next_run_time(job):
- """Tests that passing a next_run_time of the wrong type raises a TypeError."""
- exc = pytest.raises(TypeError, job._modify, next_run_time=1)
- assert str(exc.value) == 'Unsupported type for next_run_time: int'
-
-
-def test_private_modify_bad_argument(job):
- """Tests that passing an unmodifiable argument type raises an AttributeError."""
- exc = pytest.raises(AttributeError, job._modify, scheduler=1)
- assert str(exc.value) == 'The following are not modifiable attributes of Job: scheduler'
-
-
-def test_getstate(job):
- state = job.__getstate__()
- assert state == dict(
- version=1, trigger=job.trigger, executor='default', func='test_job:dummyfunc',
- name=b'n\xc3\xa4m\xc3\xa9'.decode('utf-8'), args=(), kwargs={},
- id=b't\xc3\xa9st\xc3\xafd'.decode('utf-8'), misfire_grace_time=1, coalesce=False,
- max_instances=1, next_run_time=None)
-
-
-def test_setstate(job, timezone):
- trigger = DateTrigger('2010-12-14 13:05:00', timezone)
- state = dict(
- version=1, scheduler=MagicMock(), jobstore=MagicMock(), trigger=trigger,
- executor='dummyexecutor', func='test_job:dummyfunc', name='testjob.dummyfunc',
- args=[], kwargs={}, id='other_id', misfire_grace_time=2, coalesce=True, max_instances=2,
- next_run_time=None)
- job.__setstate__(state)
- assert job.id == 'other_id'
- assert job.func == dummyfunc
- assert job.func_ref == 'test_job:dummyfunc'
- assert job.trigger == trigger
- assert job.executor == 'dummyexecutor'
- assert job.args == []
- assert job.kwargs == {}
- assert job.name == 'testjob.dummyfunc'
- assert job.misfire_grace_time == 2
- assert job.coalesce is True
- assert job.max_instances == 2
- assert job.next_run_time is None
-
-
-def test_setstate_bad_version(job):
- """Tests that __setstate__ rejects state of higher version that it was designed to handle."""
- exc = pytest.raises(ValueError, job.__setstate__, {'version': 9999})
- assert 'Job has version 9999, but only version' in str(exc.value)
-
-
-def test_eq(create_job):
- job = create_job(func=lambda: None, id='foo')
- job2 = create_job(func=lambda: None, id='foo')
- job3 = create_job(func=lambda: None, id='bar')
- assert job == job2
- assert not job == job3
- assert not job == 'foo'
-
-
-def test_repr(job):
- assert repr(job) == "<Job (id='téstïd' name='nämé')>"
-
-
-@pytest.mark.parametrize('status, expected_status', [
- ('scheduled', 'next run at: 2011-04-03 18:40:00 CEST'),
- ('paused', 'paused'),
- ('pending', 'pending')
-], ids=['scheduled', 'paused', 'pending'])
-def test_str(create_job, status, expected_status):
- job = create_job(func=dummyfunc)
- if status == 'scheduled':
- job.next_run_time = job.trigger.run_date
- elif status == 'pending':
- del job.next_run_time
-
- expected = 'nämé (trigger: date[2011-04-03 18:40:00 CEST], {})'.format(expected_status)
- assert str(job) == expected
diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py
deleted file mode 100644
index f913f37..0000000
--- a/tests/test_jobstores.py
+++ /dev/null
@@ -1,387 +0,0 @@
-from datetime import datetime
-
-import pytest
-
-from apscheduler.jobstores.memory import MemoryJobStore
-from apscheduler.jobstores.base import JobLookupError, ConflictingIdError
-
-
-def dummy_job():
- pass
-
-
-def dummy_job2():
- pass
-
-
-def dummy_job3():
- pass
-
-
-class DummyClass:
- def dummy_method(self, a, b):
- return a + b
-
- @classmethod
- def dummy_classmethod(cls, a, b):
- return a + b
-
-
-@pytest.fixture
-def memjobstore():
- yield MemoryJobStore()
-
-
-@pytest.fixture
-def sqlalchemyjobstore(tmpdir):
- db_path = tmpdir.join('apscheduler_unittest.sqlite')
- sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy')
- store = sqlalchemy.SQLAlchemyJobStore(url='sqlite:///%s' % db_path)
- store.start(None, 'sqlalchemy')
- yield store
- store.shutdown()
- db_path.remove()
-
-
-@pytest.fixture
-def rethinkdbjobstore():
- rethinkdb = pytest.importorskip('apscheduler.jobstores.rethinkdb')
- store = rethinkdb.RethinkDBJobStore(database='apscheduler_unittest')
- store.start(None, 'rethinkdb')
- yield store
- store.r.db_drop('apscheduler_unittest').run(store.conn)
- store.shutdown()
-
-
-@pytest.fixture
-def mongodbjobstore():
- mongodb = pytest.importorskip('apscheduler.jobstores.mongodb')
- store = mongodb.MongoDBJobStore(database='apscheduler_unittest')
- store.start(None, 'mongodb')
- yield store
- store.client.drop_database(store.collection.database.name)
- store.shutdown()
-
-
-@pytest.fixture
-def redisjobstore():
- redis = pytest.importorskip('apscheduler.jobstores.redis')
- store = redis.RedisJobStore()
- store.start(None, 'redis')
- yield store
- store.remove_all_jobs()
- store.shutdown()
-
-
-@pytest.fixture
-def zookeeperjobstore():
- zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
- store = zookeeper.ZooKeeperJobStore(path='/apscheduler_unittest')
- store.start(None, 'zookeeper')
- yield store
- store.remove_all_jobs()
- store.shutdown()
-
-
-@pytest.fixture(params=['memjobstore', 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore',
- 'rethinkdbjobstore', 'zookeeperjobstore'],
- ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper'])
-def jobstore(request):
- return request.getfixturevalue(request.param)
-
-
-@pytest.fixture(params=['sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore',
- 'rethinkdbjobstore', 'zookeeperjobstore'],
- ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper'])
-def persistent_jobstore(request):
- return request.getfixturevalue(request.param)
-
-
-@pytest.fixture
-def create_add_job(timezone, create_job):
- def create(jobstore, func=dummy_job, run_date=datetime(2999, 1, 1), id=None, paused=False,
- **kwargs):
- run_date = timezone.localize(run_date)
- job = create_job(func=func, trigger='date', trigger_args={'run_date': run_date}, id=id,
- **kwargs)
- job.next_run_time = None if paused else job.trigger.get_next_fire_time(None, run_date)
- if jobstore:
- jobstore.add_job(job)
- return job
-
- return create
-
-
-def test_add_instance_method_job(jobstore, create_add_job):
- instance = DummyClass()
- initial_job = create_add_job(jobstore, instance.dummy_method, kwargs={'a': 1, 'b': 2})
- job = jobstore.lookup_job(initial_job.id)
- assert job.func(*job.args, **job.kwargs) == 3
-
-
-def test_add_class_method_job(jobstore, create_add_job):
- initial_job = create_add_job(jobstore, DummyClass.dummy_classmethod, kwargs={'a': 1, 'b': 2})
- job = jobstore.lookup_job(initial_job.id)
- assert job.func(*job.args, **job.kwargs) == 3
-
-
-def test_lookup_job(jobstore, create_add_job):
- initial_job = create_add_job(jobstore)
- job = jobstore.lookup_job(initial_job.id)
- assert job == initial_job
-
-
-def test_lookup_nonexistent_job(jobstore):
- assert jobstore.lookup_job('foo') is None
-
-
-def test_get_all_jobs(jobstore, create_add_job):
- job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
- job2 = create_add_job(jobstore, dummy_job2, datetime(2013, 8, 14))
- job3 = create_add_job(jobstore, dummy_job2, datetime(2013, 7, 11), paused=True)
- jobs = jobstore.get_all_jobs()
- assert jobs == [job2, job1, job3]
-
-
-def test_get_pending_jobs(jobstore, create_add_job, timezone):
- create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
- job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26))
- job3 = create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14))
- create_add_job(jobstore, dummy_job3, datetime(2013, 7, 11), paused=True)
- jobs = jobstore.get_due_jobs(timezone.localize(datetime(2014, 2, 27)))
- assert jobs == [job3, job2]
-
- jobs = jobstore.get_due_jobs(timezone.localize(datetime(2013, 8, 13)))
- assert jobs == []
-
-
-def test_get_pending_jobs_subsecond_difference(jobstore, create_add_job, timezone):
- job1 = create_add_job(jobstore, dummy_job, datetime(2014, 7, 7, 0, 0, 0, 401))
- job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 7, 7, 0, 0, 0, 402))
- job3 = create_add_job(jobstore, dummy_job3, datetime(2014, 7, 7, 0, 0, 0, 400))
- jobs = jobstore.get_due_jobs(timezone.localize(datetime(2014, 7, 7, 1)))
- assert jobs == [job3, job1, job2]
-
-
-def test_get_next_run_time(jobstore, create_add_job, timezone):
- create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
- create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26))
- create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14))
- create_add_job(jobstore, dummy_job3, datetime(2013, 7, 11), paused=True)
- assert jobstore.get_next_run_time() == timezone.localize(datetime(2013, 8, 14))
-
-
-def test_add_job_conflicting_id(jobstore, create_add_job):
- create_add_job(jobstore, dummy_job, datetime(2016, 5, 3), id='blah')
- pytest.raises(ConflictingIdError, create_add_job, jobstore, dummy_job2, datetime(2014, 2, 26),
- id='blah')
-
-
-def test_update_job(jobstore, create_add_job, timezone):
- job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
- job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26))
- replacement = create_add_job(None, dummy_job, datetime(2016, 5, 4), id=job1.id,
- max_instances=6)
- assert replacement.max_instances == 6
- jobstore.update_job(replacement)
-
- jobs = jobstore.get_all_jobs()
- assert len(jobs) == 2
- assert jobs[0].id == job2.id
- assert jobs[1].id == job1.id
- assert jobs[1].next_run_time == timezone.localize(datetime(2016, 5, 4))
- assert jobs[1].max_instances == 6
-
-
-@pytest.mark.parametrize('next_run_time', [datetime(2013, 8, 13), None], ids=['earlier', 'null'])
-def test_update_job_next_runtime(jobstore, create_add_job, next_run_time, timezone):
- job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
- create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26))
- job3 = create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14))
- job1.next_run_time = timezone.localize(next_run_time) if next_run_time else None
- jobstore.update_job(job1)
-
- if next_run_time:
- assert jobstore.get_next_run_time() == job1.next_run_time
- else:
- assert jobstore.get_next_run_time() == job3.next_run_time
-
-
-@pytest.mark.parametrize('next_run_time', [datetime(2013, 8, 13), None], ids=['earlier', 'null'])
-@pytest.mark.parametrize('index', [0, 1, 2], ids=['first', 'middle', 'last'])
-def test_update_job_clear_next_runtime(jobstore, create_add_job, next_run_time, timezone, index):
- """
- Tests that update_job() maintains the proper ordering of the jobs,
- even when their next run times are initially the same.
-
- """
- jobs = [create_add_job(jobstore, dummy_job, datetime(2014, 2, 26), 'job%d' % i) for
- i in range(3)]
- jobs[index].next_run_time = timezone.localize(next_run_time) if next_run_time else None
- jobstore.update_job(jobs[index])
- due_date = timezone.localize(datetime(2014, 2, 27))
- due_jobs = jobstore.get_due_jobs(due_date)
-
- assert len(due_jobs) == (3 if next_run_time else 2)
- due_job_ids = [job.id for job in due_jobs]
- if next_run_time:
- if index == 0:
- assert due_job_ids == ['job0', 'job1', 'job2']
- elif index == 1:
- assert due_job_ids == ['job1', 'job0', 'job2']
- else:
- assert due_job_ids == ['job2', 'job0', 'job1']
- else:
- if index == 0:
- assert due_job_ids == ['job1', 'job2']
- elif index == 1:
- assert due_job_ids == ['job0', 'job2']
- else:
- assert due_job_ids == ['job0', 'job1']
-
-
-def test_update_job_nonexistent_job(jobstore, create_add_job):
- job = create_add_job(None, dummy_job, datetime(2016, 5, 3))
- pytest.raises(JobLookupError, jobstore.update_job, job)
-
-
-def test_one_job_fails_to_load(persistent_jobstore, create_add_job, monkeypatch, timezone):
- job1 = create_add_job(persistent_jobstore, dummy_job, datetime(2016, 5, 3))
- job2 = create_add_job(persistent_jobstore, dummy_job2, datetime(2014, 2, 26))
- create_add_job(persistent_jobstore, dummy_job3, datetime(2013, 8, 14))
-
- # Make the dummy_job2 function disappear
- monkeypatch.delitem(globals(), 'dummy_job3')
-
- jobs = persistent_jobstore.get_all_jobs()
- assert jobs == [job2, job1]
-
- assert persistent_jobstore.get_next_run_time() == timezone.localize(datetime(2014, 2, 26))
-
-
-def test_remove_job(jobstore, create_add_job):
- job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
- job2 = create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26))
-
- jobstore.remove_job(job1.id)
- jobs = jobstore.get_all_jobs()
- assert jobs == [job2]
-
- jobstore.remove_job(job2.id)
- jobs = jobstore.get_all_jobs()
- assert jobs == []
-
-
-def test_remove_nonexistent_job(jobstore):
- pytest.raises(JobLookupError, jobstore.remove_job, 'blah')
-
-
-def test_remove_all_jobs(jobstore, create_add_job):
- create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
- create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26))
-
- jobstore.remove_all_jobs()
- jobs = jobstore.get_all_jobs()
- assert jobs == []
-
-
-def test_repr_memjobstore(memjobstore):
- assert repr(memjobstore) == '<MemoryJobStore>'
-
-
-def test_repr_sqlalchemyjobstore(sqlalchemyjobstore):
- assert repr(sqlalchemyjobstore).startswith('<SQLAlchemyJobStore (url=')
-
-
-def test_repr_mongodbjobstore(mongodbjobstore):
- assert repr(mongodbjobstore).startswith("<MongoDBJobStore (client=MongoClient(")
-
-
-def test_repr_redisjobstore(redisjobstore):
- assert repr(redisjobstore) == '<RedisJobStore>'
-
-
-def test_repr_zookeeperjobstore(zookeeperjobstore):
- class_sig = "<ZooKeeperJobStore (client=<kazoo.client.KazooClient"
- assert repr(zookeeperjobstore).startswith(class_sig)
-
-
-def test_memstore_close(memjobstore, create_add_job):
- create_add_job(memjobstore, dummy_job, datetime(2016, 5, 3))
- memjobstore.shutdown()
- assert not memjobstore.get_all_jobs()
-
-
-def test_sqlalchemy_engine_ref():
- global sqla_engine
- sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy')
- sqla_engine = sqlalchemy.create_engine('sqlite:///')
- try:
- sqlalchemy.SQLAlchemyJobStore(engine='%s:sqla_engine' % __name__)
- finally:
- sqla_engine.dispose()
- del sqla_engine
-
-
-def test_sqlalchemy_missing_engine():
- sqlalchemy = pytest.importorskip('apscheduler.jobstores.sqlalchemy')
- exc = pytest.raises(ValueError, sqlalchemy.SQLAlchemyJobStore)
- assert 'Need either' in str(exc.value)
-
-
-def test_mongodb_client_ref():
- global mongodb_client
- mongodb = pytest.importorskip('apscheduler.jobstores.mongodb')
- mongodb_client = mongodb.MongoClient()
- try:
- mongodb.MongoDBJobStore(client='%s:mongodb_client' % __name__)
- finally:
- mongodb_client.close()
- del mongodb_client
-
-
-def test_zookeeper_client_ref():
- global zookeeper_client
- zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
- zookeeper_client = zookeeper.KazooClient()
- try:
- zookeeperjobstore = zookeeper.ZooKeeperJobStore(client='%s:zookeeper_client' % __name__)
- zookeeperjobstore.start(None, 'zookeeper')
- zookeeperjobstore.shutdown()
- assert zookeeper_client.connected is True
- finally:
- zookeeper_client.stop()
- zookeeper_client.close()
- del zookeeper_client
-
-
-def test_zookeeper_client_keep_open():
- global zookeeper_client
- zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
- zookeeper_client = zookeeper.KazooClient()
- try:
- zookeeperjobstore = zookeeper.ZooKeeperJobStore(client='%s:zookeeper_client' % __name__,
- close_connection_on_exit=True)
- zookeeperjobstore.start(None, 'zookeeper')
- zookeeperjobstore.shutdown()
- assert zookeeper_client.connected is False
- finally:
- del zookeeper_client
-
-
-def test_mongodb_null_database():
- mongodb = pytest.importorskip('apscheduler.jobstores.mongodb')
- exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, database='')
- assert '"database"' in str(exc.value)
-
-
-def test_mongodb_null_collection():
- mongodb = pytest.importorskip('apscheduler.jobstores.mongodb')
- exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, collection='')
- assert '"collection"' in str(exc.value)
-
-
-def test_zookeeper_null_path():
- zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
- exc = pytest.raises(ValueError, zookeeper.ZooKeeperJobStore, path='')
- assert '"path"' in str(exc.value)
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
deleted file mode 100644
index 31f49de..0000000
--- a/tests/test_schedulers.py
+++ /dev/null
@@ -1,1026 +0,0 @@
-import logging
-from datetime import datetime, timedelta
-from io import StringIO
-from queue import Queue
-from threading import Thread
-from unittest.mock import MagicMock, patch
-
-import pytest
-from pytz import utc
-
-from apscheduler.events import (
- EVENT_SCHEDULER_STARTED, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED,
- EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_ALL_JOBS_REMOVED, EVENT_EXECUTOR_ADDED,
- EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, EVENT_JOB_ADDED,
- EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_PAUSED,
- EVENT_SCHEDULER_RESUMED, SchedulerEvent)
-from apscheduler.executors.base import BaseExecutor, MaxInstancesReachedError
-from apscheduler.executors.debug import DebugExecutor
-from apscheduler.job import Job
-from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
-from apscheduler.jobstores.memory import MemoryJobStore
-from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
-from apscheduler.schedulers.base import BaseScheduler, STATE_RUNNING, STATE_STOPPED
-from apscheduler.triggers.base import BaseTrigger
-from apscheduler.util import undefined
-
-
-class DummyScheduler(BaseScheduler):
- def __init__(self, *args, **kwargs):
- super(DummyScheduler, self).__init__(*args, **kwargs)
- self.wakeup = MagicMock()
-
- def shutdown(self, wait=True):
- super(DummyScheduler, self).shutdown(wait)
-
- def wakeup(self):
- pass
-
-
-class DummyTrigger(BaseTrigger):
- def __init__(self, **args):
- self.args = args
-
- def get_next_fire_time(self, previous_fire_time, now):
- pass
-
-
-class DummyExecutor(BaseExecutor):
- def __init__(self, **args):
- super(DummyExecutor, self).__init__()
- self.args = args
- self.start = MagicMock()
- self.shutdown = MagicMock()
- self.submit_job = MagicMock()
-
- def _do_submit_job(self, job, run_times):
- pass
-
-
-class DummyJobStore(BaseJobStore):
- def __init__(self, **args):
- super(DummyJobStore, self).__init__()
- self.args = args
- self.start = MagicMock()
- self.shutdown = MagicMock()
-
- def get_due_jobs(self, now):
- pass
-
- def lookup_job(self, job_id):
- pass
-
- def remove_job(self, job_id):
- pass
-
- def remove_all_jobs(self):
- pass
-
- def get_next_run_time(self):
- pass
-
- def get_all_jobs(self):
- pass
-
- def add_job(self, job):
- pass
-
- def update_job(self, job):
- pass
-
-
-class TestBaseScheduler(object):
- @pytest.fixture
- def scheduler(self, timezone):
- return DummyScheduler()
-
- @pytest.fixture
- def scheduler_events(self, request, scheduler):
- events = []
- mask = getattr(request, 'param', EVENT_ALL ^ EVENT_SCHEDULER_STARTED)
- scheduler.add_listener(events.append, mask)
- return events
-
- def test_constructor(self):
- with patch('%s.DummyScheduler.configure' % __name__) as configure:
- gconfig = {'apscheduler.foo': 'bar', 'apscheduler.x': 'y'}
- options = {'bar': 'baz', 'xyz': 123}
- DummyScheduler(gconfig, **options)
-
- configure.assert_called_once_with(gconfig, **options)
-
- @pytest.mark.parametrize('gconfig', [
- {
- 'apscheduler.timezone': 'UTC',
- 'apscheduler.job_defaults.misfire_grace_time': '5',
- 'apscheduler.job_defaults.coalesce': 'false',
- 'apscheduler.job_defaults.max_instances': '9',
- 'apscheduler.executors.default.class': '%s:DummyExecutor' % __name__,
- 'apscheduler.executors.default.arg1': '3',
- 'apscheduler.executors.default.arg2': 'a',
- 'apscheduler.executors.alter.class': '%s:DummyExecutor' % __name__,
- 'apscheduler.executors.alter.arg': 'true',
- 'apscheduler.jobstores.default.class': '%s:DummyJobStore' % __name__,
- 'apscheduler.jobstores.default.arg1': '3',
- 'apscheduler.jobstores.default.arg2': 'a',
- 'apscheduler.jobstores.bar.class': '%s:DummyJobStore' % __name__,
- 'apscheduler.jobstores.bar.arg': 'false',
- },
- {
- 'apscheduler.timezone': 'UTC',
- 'apscheduler.job_defaults': {
- 'misfire_grace_time': '5',
- 'coalesce': 'false',
- 'max_instances': '9',
- },
- 'apscheduler.executors': {
- 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'},
- 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'}
- },
- 'apscheduler.jobstores': {
- 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'},
- 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'}
- }
- }
- ], ids=['ini-style', 'yaml-style'])
- def test_configure(self, scheduler, gconfig):
- scheduler._configure = MagicMock()
- scheduler.configure(gconfig, timezone='Other timezone')
-
- scheduler._configure.assert_called_once_with({
- 'timezone': 'Other timezone',
- 'job_defaults': {
- 'misfire_grace_time': '5',
- 'coalesce': 'false',
- 'max_instances': '9',
- },
- 'executors': {
- 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'},
- 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'}
- },
- 'jobstores': {
- 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'},
- 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'}
- }
- })
-
- @pytest.mark.parametrize('method', [
- BaseScheduler.configure,
- BaseScheduler.start
- ])
- def test_scheduler_already_running(self, method, scheduler):
- """
- Test that SchedulerAlreadyRunningError is raised when certain methods are called before
- the scheduler has been started.
-
- """
- scheduler.start(paused=True)
- pytest.raises(SchedulerAlreadyRunningError, method, scheduler)
-
- @pytest.mark.parametrize('method', [
- BaseScheduler.pause,
- BaseScheduler.resume,
- BaseScheduler.shutdown
- ], ids=['pause', 'resume', 'shutdown'])
- def test_scheduler_not_running(self, scheduler, method):
- """
- Test that the SchedulerNotRunningError is raised when certain methods are called before
- the scheduler has been started.
-
- """
- pytest.raises(SchedulerNotRunningError, method, scheduler)
-
- def test_start(self, scheduler, create_job):
- scheduler._executors = {'exec1': MagicMock(BaseExecutor), 'exec2': MagicMock(BaseExecutor)}
- scheduler._jobstores = {'store1': MagicMock(BaseJobStore),
- 'store2': MagicMock(BaseJobStore)}
- job = create_job(func=lambda: None)
- scheduler._pending_jobs = [(job, 'store1', False)]
- scheduler._real_add_job = MagicMock()
- scheduler._dispatch_event = MagicMock()
- scheduler.start()
-
- scheduler._executors['exec1'].start.assert_called_once_with(scheduler, 'exec1')
- scheduler._executors['exec2'].start.assert_called_once_with(scheduler, 'exec2')
- scheduler._jobstores['store1'].start.assert_called_once_with(scheduler, 'store1')
- scheduler._jobstores['store2'].start.assert_called_once_with(scheduler, 'store2')
- assert len(scheduler._executors) == 3
- assert len(scheduler._jobstores) == 3
- assert 'default' in scheduler._executors
- assert 'default' in scheduler._jobstores
-
- scheduler._real_add_job.assert_called_once_with(job, 'store1', False)
- assert scheduler._pending_jobs == []
-
- assert scheduler._dispatch_event.call_count == 3
- event = scheduler._dispatch_event.call_args_list[0][0][0]
- assert event.code == EVENT_EXECUTOR_ADDED
- assert event.alias == 'default'
- event = scheduler._dispatch_event.call_args_list[1][0][0]
- assert event.code == EVENT_JOBSTORE_ADDED
- assert event.alias == 'default'
- event = scheduler._dispatch_event.call_args_list[2][0][0]
- assert event.code == EVENT_SCHEDULER_STARTED
-
- assert scheduler.state == STATE_RUNNING
-
- @pytest.mark.parametrize('wait', [True, False], ids=['wait', 'nowait'])
- def test_shutdown(self, scheduler, scheduler_events, wait):
- executor = DummyExecutor()
- jobstore = DummyJobStore()
- scheduler.add_executor(executor)
- scheduler.add_jobstore(jobstore)
- scheduler.start(paused=True)
- del scheduler_events[:]
- scheduler.shutdown(wait)
-
- assert scheduler.state == STATE_STOPPED
- assert len(scheduler_events) == 1
- assert scheduler_events[0].code == EVENT_SCHEDULER_SHUTDOWN
-
- executor.shutdown.assert_called_once_with(wait)
- jobstore.shutdown.assert_called_once_with()
-
- def test_pause_resume(self, scheduler, scheduler_events):
- scheduler.start()
- del scheduler_events[:]
- scheduler.wakeup.reset_mock()
-
- scheduler.pause()
-
- assert len(scheduler_events) == 1
- assert scheduler_events[0].code == EVENT_SCHEDULER_PAUSED
- assert not scheduler.wakeup.called
-
- scheduler.resume()
- assert len(scheduler_events) == 2
- assert scheduler_events[1].code == EVENT_SCHEDULER_RESUMED
- assert scheduler.wakeup.called
-
- @pytest.mark.parametrize('start_scheduler', [True, False])
- def test_running(self, scheduler, start_scheduler):
- if start_scheduler:
- scheduler.start()
-
- assert scheduler.running is start_scheduler
-
- @pytest.mark.parametrize('start_scheduler', [True, False])
- def test_add_remove_executor(self, scheduler, scheduler_events, start_scheduler):
- if start_scheduler:
- scheduler.start(paused=True)
-
- del scheduler_events[:]
- executor = DummyExecutor()
- scheduler.add_executor(executor, 'exec1')
-
- assert len(scheduler_events) == 1
- assert scheduler_events[0].code == EVENT_EXECUTOR_ADDED
- assert scheduler_events[0].alias == 'exec1'
- if start_scheduler:
- executor.start.assert_called_once_with(scheduler, 'exec1')
- else:
- assert not executor.start.called
-
- scheduler.remove_executor('exec1')
- assert len(scheduler_events) == 2
- assert scheduler_events[1].code == EVENT_EXECUTOR_REMOVED
- assert scheduler_events[1].alias == 'exec1'
- assert executor.shutdown.called
-
- def test_add_executor_already_exists(self, scheduler):
- executor = DummyExecutor()
- scheduler.add_executor(executor)
- exc = pytest.raises(ValueError, scheduler.add_executor, executor)
- assert str(exc.value) == 'This scheduler already has an executor by the alias of "default"'
-
- def test_remove_executor_nonexistent(self, scheduler):
- pytest.raises(KeyError, scheduler.remove_executor, 'foo')
-
- @pytest.mark.parametrize('start_scheduler', [True, False])
- def test_add_jobstore(self, scheduler, scheduler_events, start_scheduler):
- """
- Test that the proper event is dispatched when a job store is added and the scheduler's
- wake() method is called if the scheduler is running.
-
- """
- if start_scheduler:
- scheduler.start()
-
- del scheduler_events[:]
- jobstore = DummyJobStore()
- scheduler.add_jobstore(jobstore, 'store1')
-
- assert len(scheduler_events) == 1
- assert scheduler_events[0].code == EVENT_JOBSTORE_ADDED
- assert scheduler_events[0].alias == 'store1'
-
- if start_scheduler:
- assert scheduler.wakeup.called
- jobstore.start.assert_called_once_with(scheduler, 'store1')
- else:
- assert not jobstore.start.called
-
- def test_add_jobstore_already_exists(self, scheduler):
- """
- Test that ValueError is raised when a job store is added with an alias that already exists.
-
- """
- jobstore = MemoryJobStore()
- scheduler.add_jobstore(jobstore)
- exc = pytest.raises(ValueError, scheduler.add_jobstore, jobstore)
- assert str(exc.value) == 'This scheduler already has a job store by the alias of "default"'
-
- def test_remove_jobstore(self, scheduler, scheduler_events):
- scheduler.add_jobstore(MemoryJobStore(), 'foo')
- scheduler.remove_jobstore('foo')
-
- assert len(scheduler_events) == 2
- assert scheduler_events[1].code == EVENT_JOBSTORE_REMOVED
- assert scheduler_events[1].alias == 'foo'
-
- def test_remove_jobstore_nonexistent(self, scheduler):
- pytest.raises(KeyError, scheduler.remove_jobstore, 'foo')
-
- def test_add_remove_listener(self, scheduler):
- """Test that event dispatch works but removed listeners aren't called."""
- events = []
- scheduler.add_listener(events.append, EVENT_EXECUTOR_ADDED)
- scheduler.add_executor(DummyExecutor(), 'exec1')
- scheduler.remove_listener(events.append)
- scheduler.add_executor(DummyExecutor(), 'exec2')
- assert len(events) == 1
-
- def test_add_job_return_value(self, scheduler, timezone):
- """Test that when a job is added to a stopped scheduler, a Job instance is returned."""
- job = scheduler.add_job(lambda x, y: None, 'date', [1], {'y': 2}, 'my-id', 'dummy',
- next_run_time=datetime(2014, 5, 23, 10),
- run_date='2014-06-01 08:41:00')
-
- assert isinstance(job, Job)
- assert job.id == 'my-id'
- assert not hasattr(job, 'misfire_grace_time')
- assert not hasattr(job, 'coalesce')
- assert not hasattr(job, 'max_instances')
- assert job.next_run_time.tzinfo.zone == timezone.zone
-
- def test_add_job_pending(self, scheduler, scheduler_events):
- """
- Test that when a job is added to a stopped scheduler, it is not added to a job store until
- the scheduler is started and that the event is dispatched when that happens.
-
- """
- scheduler.configure(job_defaults={
- 'misfire_grace_time': 3, 'coalesce': False, 'max_instances': 6
- })
- job = scheduler.add_job(lambda: None, 'interval', hours=1)
- assert not scheduler_events
-
- scheduler.start(paused=True)
-
- assert len(scheduler_events) == 3
- assert scheduler_events[2].code == EVENT_JOB_ADDED
- assert scheduler_events[2].job_id is job.id
-
- # Check that the undefined values were replaced with scheduler's job defaults
- assert job.misfire_grace_time == 3
- assert not job.coalesce
- assert job.max_instances == 6
-
- def test_add_job_id_conflict(self, scheduler):
- """
- Test that if a job is added with an already existing id, ConflictingIdError is raised.
-
- """
- scheduler.start(paused=True)
- scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1)
- pytest.raises(ConflictingIdError, scheduler.add_job, lambda: None, 'interval',
- id='testjob', seconds=1)
-
- def test_add_job_replace(self, scheduler):
- """Test that with replace_existing=True, a new job replaces another with the same id."""
- scheduler.start(paused=True)
- scheduler.add_job(lambda: None, 'interval', id='testjob', seconds=1)
- scheduler.add_job(lambda: None, 'cron', id='testjob', name='replacement',
- replace_existing=True)
- jobs = scheduler.get_jobs()
- assert len(jobs) == 1
- assert jobs[0].name == 'replacement'
-
- def test_scheduled_job(self, scheduler):
- def func(x, y):
- pass
-
- scheduler.add_job = MagicMock()
- decorator = scheduler.scheduled_job('date', [1], {'y': 2}, 'my-id',
- 'dummy', run_date='2014-06-01 08:41:00')
- decorator(func)
-
- scheduler.add_job.assert_called_once_with(
- func, 'date', [1], {'y': 2}, 'my-id', 'dummy', undefined, undefined, undefined,
- undefined, 'default', 'default', True, run_date='2014-06-01 08:41:00')
-
- @pytest.mark.parametrize('pending', [True, False], ids=['pending job', 'scheduled job'])
- def test_modify_job(self, scheduler, pending, timezone):
- job = MagicMock()
- scheduler._dispatch_event = MagicMock()
- scheduler._lookup_job = MagicMock(return_value=(job, None if pending else 'default'))
- if not pending:
- jobstore = MagicMock()
- scheduler._lookup_jobstore = lambda alias: jobstore if alias == 'default' else None
- scheduler.modify_job('blah', misfire_grace_time=5, max_instances=2,
- next_run_time=datetime(2014, 10, 17))
-
- job._modify.assert_called_once_with(misfire_grace_time=5, max_instances=2,
- next_run_time=datetime(2014, 10, 17))
- if not pending:
- jobstore.update_job.assert_called_once_with(job)
-
- assert scheduler._dispatch_event.call_count == 1
- event = scheduler._dispatch_event.call_args[0][0]
- assert event.code == EVENT_JOB_MODIFIED
- assert event.jobstore == (None if pending else 'default')
-
- def test_reschedule_job(self, scheduler):
- scheduler.modify_job = MagicMock()
- trigger = MagicMock(get_next_fire_time=lambda previous, now: 1)
- scheduler._create_trigger = MagicMock(return_value=trigger)
- scheduler.reschedule_job('my-id', 'jobstore', 'date', run_date='2014-06-01 08:41:00')
-
- assert scheduler.modify_job.call_count == 1
- assert scheduler.modify_job.call_args[0] == ('my-id', 'jobstore')
- assert scheduler.modify_job.call_args[1] == {'trigger': trigger, 'next_run_time': 1}
-
- def test_pause_job(self, scheduler):
- scheduler.modify_job = MagicMock()
- scheduler.pause_job('job_id', 'jobstore')
-
- scheduler.modify_job.assert_called_once_with('job_id', 'jobstore', next_run_time=None)
-
- @pytest.mark.parametrize('dead_job', [True, False], ids=['dead job', 'live job'])
- def test_resume_job(self, scheduler, freeze_time, dead_job):
- next_fire_time = None if dead_job else freeze_time.current + timedelta(seconds=1)
- trigger = MagicMock(BaseTrigger, get_next_fire_time=lambda prev, now: next_fire_time)
- returned_job = MagicMock(Job, id='foo', trigger=trigger)
- scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar'))
- scheduler.modify_job = MagicMock()
- scheduler.remove_job = MagicMock()
- scheduler.resume_job('foo')
-
- if dead_job:
- scheduler.remove_job.assert_called_once_with('foo', 'bar')
- else:
- scheduler.modify_job.assert_called_once_with('foo', 'bar',
- next_run_time=next_fire_time)
-
- @pytest.mark.parametrize('scheduler_started', [True, False], ids=['running', 'stopped'])
- @pytest.mark.parametrize('jobstore', [None, 'other'],
- ids=['all jobstores', 'specific jobstore'])
- def test_get_jobs(self, scheduler, scheduler_started, jobstore):
- scheduler.add_jobstore(MemoryJobStore(), 'other')
- scheduler.add_job(lambda: None, 'interval', seconds=1, id='job1')
- scheduler.add_job(lambda: None, 'interval', seconds=1, id='job2', jobstore='other')
- if scheduler_started:
- scheduler.start(paused=True)
-
- expected_job_ids = {'job2'}
- if jobstore is None:
- expected_job_ids.add('job1')
-
- job_ids = {job.id for job in scheduler.get_jobs(jobstore)}
- assert job_ids == expected_job_ids
-
- @pytest.mark.parametrize('jobstore', [None, 'bar'], ids=['any jobstore', 'specific jobstore'])
- def test_get_job(self, scheduler, jobstore):
- returned_job = object()
- scheduler._lookup_job = MagicMock(return_value=(returned_job, 'bar'))
- job = scheduler.get_job('foo', jobstore)
-
- assert job is returned_job
-
- def test_get_job_nonexistent_job(self, scheduler):
- scheduler._lookup_job = MagicMock(side_effect=JobLookupError('foo'))
- assert scheduler.get_job('foo') is None
-
- def test_get_job_nonexistent_jobstore(self, scheduler):
- assert scheduler.get_job('foo', 'bar') is None
-
- @pytest.mark.parametrize('start_scheduler', [True, False])
- @pytest.mark.parametrize('jobstore', [None, 'other'],
- ids=['any jobstore', 'specific jobstore'])
- def test_remove_job(self, scheduler, scheduler_events, start_scheduler, jobstore):
- scheduler.add_jobstore(MemoryJobStore(), 'other')
- scheduler.add_job(lambda: None, id='job1')
- if start_scheduler:
- scheduler.start(paused=True)
-
- del scheduler_events[:]
- if jobstore:
- pytest.raises(JobLookupError, scheduler.remove_job, 'job1', jobstore)
- assert len(scheduler.get_jobs()) == 1
- assert len(scheduler_events) == 0
- else:
- scheduler.remove_job('job1', jobstore)
- assert len(scheduler.get_jobs()) == 0
- assert len(scheduler_events) == 1
- assert scheduler_events[0].code == EVENT_JOB_REMOVED
-
- def test_remove_nonexistent_job(self, scheduler):
- pytest.raises(JobLookupError, scheduler.remove_job, 'foo')
-
- @pytest.mark.parametrize('start_scheduler', [True, False])
- @pytest.mark.parametrize('jobstore', [None, 'other'], ids=['all', 'single jobstore'])
- def test_remove_all_jobs(self, scheduler, start_scheduler, scheduler_events, jobstore):
- """
- Test that remove_all_jobs() removes all jobs from all attached job stores, plus any
- pending jobs.
-
- """
- scheduler.add_jobstore(MemoryJobStore(), 'other')
- scheduler.add_job(lambda: None, id='job1')
- scheduler.add_job(lambda: None, id='job2')
- scheduler.add_job(lambda: None, id='job3', jobstore='other')
- if start_scheduler:
- scheduler.start(paused=True)
-
- del scheduler_events[:]
- scheduler.remove_all_jobs(jobstore)
- jobs = scheduler.get_jobs()
-
- assert len(jobs) == (2 if jobstore else 0)
- assert len(scheduler_events) == 1
- assert scheduler_events[0].code == EVENT_ALL_JOBS_REMOVED
- assert scheduler_events[0].alias == jobstore
-
- @pytest.mark.parametrize('start_scheduler', [True, False])
- @pytest.mark.parametrize('jobstore', [None, 'other'],
- ids=['all jobstores', 'specific jobstore'])
- def test_print_jobs(self, scheduler, start_scheduler, jobstore):
- scheduler.add_jobstore(MemoryJobStore(), 'other')
- if start_scheduler:
- scheduler.start(paused=True)
-
- scheduler.add_job(lambda: None, 'date', run_date='2099-09-09', id='job1',
- name='test job 1')
- scheduler.add_job(lambda: None, 'date', run_date='2099-08-08', id='job2',
- name='test job 2', jobstore='other')
-
- outfile = StringIO()
- scheduler.print_jobs(jobstore, outfile)
-
- if jobstore and not start_scheduler:
- assert outfile.getvalue() == """\
-Pending jobs:
- test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending)
-"""
- elif jobstore and start_scheduler:
- assert outfile.getvalue() == """\
-Jobstore other:
- test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET)
-"""
- elif not jobstore and not start_scheduler:
- assert outfile.getvalue() == """\
-Pending jobs:
- test job 1 (trigger: date[2099-09-09 00:00:00 CET], pending)
- test job 2 (trigger: date[2099-08-08 00:00:00 CET], pending)
-"""
- else:
- assert outfile.getvalue() == """\
-Jobstore default:
- test job 1 (trigger: date[2099-09-09 00:00:00 CET], next run at: 2099-09-09 00:00:00 CET)
-Jobstore other:
- test job 2 (trigger: date[2099-08-08 00:00:00 CET], next run at: 2099-08-08 00:00:00 CET)
-"""
-
- @pytest.mark.parametrize('config', [
- {
- 'timezone': 'UTC',
- 'job_defaults': {
- 'misfire_grace_time': '5',
- 'coalesce': 'false',
- 'max_instances': '9',
- },
- 'executors': {
- 'default': {'class': '%s:DummyExecutor' % __name__, 'arg1': '3', 'arg2': 'a'},
- 'alter': {'class': '%s:DummyExecutor' % __name__, 'arg': 'true'}
- },
- 'jobstores': {
- 'default': {'class': '%s:DummyJobStore' % __name__, 'arg1': '3', 'arg2': 'a'},
- 'bar': {'class': '%s:DummyJobStore' % __name__, 'arg': 'false'}
- }
- },
- {
- 'timezone': utc,
- 'job_defaults': {
- 'misfire_grace_time': 5,
- 'coalesce': False,
- 'max_instances': 9,
- },
- 'executors': {
- 'default': DummyExecutor(arg1='3', arg2='a'),
- 'alter': DummyExecutor(arg='true')
- },
- 'jobstores': {
- 'default': DummyJobStore(arg1='3', arg2='a'),
- 'bar': DummyJobStore(arg='false')
- }
- }
- ], ids=['references', 'instances'])
- def test_configure_private(self, scheduler, config):
- scheduler._configure(config)
-
- assert scheduler.timezone is utc
- assert scheduler._job_defaults == {
- 'misfire_grace_time': 5,
- 'coalesce': False,
- 'max_instances': 9
- }
- assert set(scheduler._executors.keys()) == {'default', 'alter'}
- assert scheduler._executors['default'].args == {'arg1': '3', 'arg2': 'a'}
- assert scheduler._executors['alter'].args == {'arg': 'true'}
- assert set(scheduler._jobstores.keys()) == {'default', 'bar'}
- assert scheduler._jobstores['default'].args == {'arg1': '3', 'arg2': 'a'}
- assert scheduler._jobstores['bar'].args == {'arg': 'false'}
-
- def test_configure_private_invalid_executor(self, scheduler):
- exc = pytest.raises(TypeError, scheduler._configure, {'executors': {'default': 6}})
- assert str(exc.value) == ("Expected executor instance or dict for executors['default'], "
- "got int instead")
-
- def test_configure_private_invalid_jobstore(self, scheduler):
- exc = pytest.raises(TypeError, scheduler._configure, {'jobstores': {'default': 6}})
- assert str(exc.value) == ("Expected job store instance or dict for jobstores['default'], "
- "got int instead")
-
- def test_create_default_executor(self, scheduler):
- executor = scheduler._create_default_executor()
- assert isinstance(executor, BaseExecutor)
-
- def test_create_default_jobstore(self, scheduler):
- store = scheduler._create_default_jobstore()
- assert isinstance(store, BaseJobStore)
-
- def test_lookup_executor(self, scheduler):
- executor = object()
- scheduler._executors = {'executor': executor}
- assert scheduler._lookup_executor('executor') is executor
-
- def test_lookup_executor_nonexistent(self, scheduler):
- pytest.raises(KeyError, scheduler._lookup_executor, 'executor')
-
- def test_lookup_jobstore(self, scheduler):
- store = object()
- scheduler._jobstores = {'store': store}
- assert scheduler._lookup_jobstore('store') is store
-
- def test_lookup_jobstore_nonexistent(self, scheduler):
- pytest.raises(KeyError, scheduler._lookup_jobstore, 'store')
-
- def test_dispatch_event(self, scheduler):
- event = SchedulerEvent(1)
- scheduler._listeners = [(MagicMock(), 2), (MagicMock(side_effect=Exception), 1),
- (MagicMock(), 1)]
- scheduler._dispatch_event(event)
-
- assert not scheduler._listeners[0][0].called
- scheduler._listeners[1][0].assert_called_once_with(event)
-
- @pytest.mark.parametrize('load_plugin', [True, False], ids=['load plugin', 'plugin loaded'])
- def test_create_trigger(self, scheduler, load_plugin):
- """Tests that creating a trigger with an already loaded plugin works."""
-
- scheduler._trigger_plugins = {}
- scheduler._trigger_classes = {}
- if load_plugin:
- scheduler._trigger_plugins['dummy'] = MagicMock(
- load=MagicMock(return_value=DummyTrigger))
- else:
- scheduler._trigger_classes['dummy'] = DummyTrigger
-
- result = scheduler._create_trigger('dummy', {'a': 1, 'b': 'x'})
-
- assert isinstance(result, DummyTrigger)
- assert result.args == {'a': 1, 'b': 'x', 'timezone': scheduler.timezone}
-
- def test_create_trigger_instance(self, scheduler):
- """Tests that passing a trigger instance will return the instance as-is."""
- trigger_instance = DummyTrigger()
-
- assert scheduler._create_trigger(trigger_instance, {}) is trigger_instance
-
- def test_create_trigger_default_type(self, scheduler):
- """Tests that passing None as the trigger will create a "date" trigger instance."""
- scheduler._trigger_classes = {'date': DummyTrigger}
- result = scheduler._create_trigger(None, {'a': 1})
-
- assert isinstance(result, DummyTrigger)
- assert result.args == {'a': 1, 'timezone': scheduler.timezone}
-
- def test_create_trigger_bad_trigger_type(self, scheduler):
- exc = pytest.raises(TypeError, scheduler._create_trigger, 1, {})
- assert str(exc.value) == 'Expected a trigger instance or string, got int instead'
-
- def test_create_trigger_bad_plugin_type(self, scheduler):
- mock_plugin = MagicMock()
- mock_plugin.load.configure_mock(return_value=object)
- scheduler._trigger_classes = {}
- scheduler._trigger_plugins = {'dummy': mock_plugin}
- exc = pytest.raises(TypeError, scheduler._create_trigger, 'dummy', {})
- assert str(exc.value) == 'The trigger entry point does not point to a trigger class'
-
- def test_create_trigger_nonexisting_plugin(self, scheduler):
- exc = pytest.raises(LookupError, scheduler._create_trigger, 'dummy', {})
- assert str(exc.value) == 'No trigger by the name "dummy" was found'
-
- def test_create_lock(self, scheduler):
- lock = scheduler._create_lock()
- assert hasattr(lock, '__enter__')
-
- def test_process_jobs_empty(self, scheduler):
- assert scheduler._process_jobs() is None
-
- def test_job_submitted_event(self, scheduler, freeze_time):
- events = []
- scheduler.add_job(lambda: None, run_date=freeze_time.get())
- scheduler.add_listener(events.append, EVENT_JOB_SUBMITTED)
- scheduler.start()
- scheduler._process_jobs()
-
- assert len(events) == 1
- assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)]
-
- @pytest.mark.parametrize('scheduler_events', [EVENT_JOB_MAX_INSTANCES],
- indirect=['scheduler_events'])
- def test_job_max_instances_event(self, scheduler, scheduler_events, freeze_time):
- class MaxedOutExecutor(DebugExecutor):
- def submit_job(self, job, run_times):
- raise MaxInstancesReachedError(job)
-
- executor = MaxedOutExecutor()
- scheduler.add_executor(executor, 'maxed')
- scheduler.add_job(lambda: None, run_date=freeze_time.get(), executor='maxed')
- scheduler.start()
- scheduler._process_jobs()
-
- assert len(scheduler_events) == 1
- assert scheduler_events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)]
-
-
-class TestProcessJobs(object):
- @pytest.fixture
- def job(self):
- job = MagicMock(Job, id=999, executor='default')
- job.trigger = MagicMock(get_next_fire_time=MagicMock(return_value=None))
- job. __str__ = lambda x: 'job 999'
- return job
-
- @pytest.fixture
- def scheduler(self):
- scheduler = DummyScheduler()
- scheduler.start()
- return scheduler
-
- @pytest.fixture
- def jobstore(self, scheduler, job):
- jobstore = MagicMock(BaseJobStore, get_due_jobs=MagicMock(return_value=[job]),
- get_next_run_time=MagicMock(return_value=None))
- scheduler._jobstores['default'] = jobstore
- return jobstore
-
- @pytest.fixture
- def executor(self, scheduler):
- executor = MagicMock(BaseExecutor)
- scheduler._executors['default'] = executor
- return executor
-
- def test_nonexistent_executor(self, scheduler, jobstore, caplog):
- """
- Test that an error is logged and the job is removed from its job store if its executor is
- not found.
-
- """
- caplog.set_level(logging.ERROR)
- scheduler.remove_executor('default')
- assert scheduler._process_jobs() is None
- jobstore.remove_job.assert_called_once_with(999)
- assert len(caplog.records) == 1
- assert caplog.records[0].message == \
- 'Executor lookup ("default") failed for job "job 999" -- removing it from the job ' \
- 'store'
-
- def test_max_instances_reached(self, scheduler, job, jobstore, executor, caplog):
- """Tests that a warning is logged when the maximum instances of a job is reached."""
- caplog.set_level(logging.WARNING)
- executor.submit_job = MagicMock(side_effect=MaxInstancesReachedError(job))
-
- assert scheduler._process_jobs() is None
- assert len(caplog.records) == 1
- assert caplog.records[0].message == \
- 'Execution of job "job 999" skipped: maximum number of running instances reached (1)'
-
- def test_executor_error(self, scheduler, jobstore, executor, caplog):
- """Tests that if any exception is raised in executor.submit(), it is logged."""
- caplog.set_level(logging.ERROR)
- executor.submit_job = MagicMock(side_effect=Exception('test message'))
-
- assert scheduler._process_jobs() is None
- assert len(caplog.records) == 1
- assert 'test message' in caplog.records[0].exc_text
- assert 'Error submitting job "job 999" to executor "default"' in caplog.records[0].message
-
- def test_job_update(self, scheduler, job, jobstore, freeze_time):
- """
- Tests that the job is updated in its job store with the next run time from the trigger.
-
- """
- next_run_time = freeze_time.current + timedelta(seconds=6)
- job.trigger.get_next_fire_time = MagicMock(return_value=next_run_time)
- assert scheduler._process_jobs() is None
- job._modify.assert_called_once_with(next_run_time=next_run_time)
- jobstore.update_job.assert_called_once_with(job)
-
- def test_wait_time(self, scheduler, freeze_time):
- """
- Tests that the earliest next run time from all job stores is returned (ignoring Nones).
-
- """
- scheduler._jobstores = {
- 'default': MagicMock(get_next_run_time=MagicMock(
- return_value=freeze_time.current + timedelta(seconds=8))),
- 'alter': MagicMock(get_next_run_time=MagicMock(return_value=None)),
- 'another': MagicMock(get_next_run_time=MagicMock(
- return_value=freeze_time.current + timedelta(seconds=5))),
- 'more': MagicMock(get_next_run_time=MagicMock(
- return_value=freeze_time.current + timedelta(seconds=6))),
- }
-
- assert scheduler._process_jobs() == 5
-
-
-class SchedulerImplementationTestBase(object):
- @pytest.fixture(autouse=True)
- def executor(self, scheduler):
- scheduler.add_executor(DebugExecutor())
-
- @pytest.fixture
- def start_scheduler(self, request, scheduler):
- yield scheduler.start
- if scheduler.running:
- scheduler.shutdown()
-
- @pytest.fixture
- def eventqueue(self, scheduler):
- events = Queue()
- scheduler.add_listener(events.put)
- return events
-
- def wait_event(self, queue):
- return queue.get(True, 1)
-
- def test_add_pending_job(self, scheduler, freeze_time, eventqueue, start_scheduler):
- """Tests that pending jobs are added (and if due, executed) when the scheduler starts."""
- freeze_time.set_increment(timedelta(seconds=0.2))
- scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2], run_date=freeze_time.next())
- start_scheduler()
-
- assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
- assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED
- assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED
- event = self.wait_event(eventqueue)
- assert event.code == EVENT_JOB_EXECUTED
- assert event.retval == 3
- assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED
-
- def test_add_live_job(self, scheduler, freeze_time, eventqueue, start_scheduler):
- """Tests that adding a job causes it to be executed after the specified delay."""
- freeze_time.set_increment(timedelta(seconds=0.2))
- start_scheduler()
- assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
- assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED
-
- scheduler.add_job(lambda x, y: x + y, 'date', args=[1, 2],
- run_date=freeze_time.next() + freeze_time.increment * 2)
- assert self.wait_event(eventqueue).code == EVENT_JOB_ADDED
- event = self.wait_event(eventqueue)
- assert event.code == EVENT_JOB_EXECUTED
- assert event.retval == 3
- assert self.wait_event(eventqueue).code == EVENT_JOB_REMOVED
-
- def test_shutdown(self, scheduler, eventqueue, start_scheduler):
- """Tests that shutting down the scheduler emits the proper event."""
- start_scheduler()
- assert self.wait_event(eventqueue).code == EVENT_JOBSTORE_ADDED
- assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_STARTED
-
- scheduler.shutdown()
- assert self.wait_event(eventqueue).code == EVENT_SCHEDULER_SHUTDOWN
-
-
-class TestBlockingScheduler(SchedulerImplementationTestBase):
- @pytest.fixture
- def scheduler(self):
- from apscheduler.schedulers.blocking import BlockingScheduler
- return BlockingScheduler()
-
- @pytest.fixture
- def start_scheduler(self, request, scheduler):
- thread = Thread(target=scheduler.start)
- yield thread.start
-
- if scheduler.running:
- scheduler.shutdown()
- thread.join()
-
-
-class TestBackgroundScheduler(SchedulerImplementationTestBase):
- @pytest.fixture
- def scheduler(self):
- from apscheduler.schedulers.background import BackgroundScheduler
- return BackgroundScheduler()
-
-
-class TestAsyncIOScheduler(SchedulerImplementationTestBase):
- @pytest.fixture
- def event_loop(self):
- asyncio = pytest.importorskip('apscheduler.schedulers.asyncio')
- return asyncio.asyncio.new_event_loop()
-
- @pytest.fixture
- def scheduler(self, event_loop):
- asyncio = pytest.importorskip('apscheduler.schedulers.asyncio')
- return asyncio.AsyncIOScheduler(event_loop=event_loop)
-
- @pytest.fixture
- def start_scheduler(self, request, event_loop, scheduler):
- event_loop.call_soon_threadsafe(scheduler.start)
- thread = Thread(target=event_loop.run_forever)
- yield thread.start
-
- if scheduler.running:
- event_loop.call_soon_threadsafe(scheduler.shutdown)
- event_loop.call_soon_threadsafe(event_loop.stop)
- thread.join()
-
-
-class TestGeventScheduler(SchedulerImplementationTestBase):
- @pytest.fixture
- def scheduler(self):
- gevent = pytest.importorskip('apscheduler.schedulers.gevent')
- return gevent.GeventScheduler()
-
- @pytest.fixture
- def calc_event(self):
- from gevent.event import Event
- return Event()
-
- @pytest.fixture
- def eventqueue(self, scheduler):
- from gevent.queue import Queue
- events = Queue()
- scheduler.add_listener(events.put)
- return events
-
-
-class TestTornadoScheduler(SchedulerImplementationTestBase):
- @pytest.fixture
- def io_loop(self):
- ioloop = pytest.importorskip('tornado.ioloop')
- return ioloop.IOLoop()
-
- @pytest.fixture
- def scheduler(self, io_loop):
- tornado = pytest.importorskip('apscheduler.schedulers.tornado')
- return tornado.TornadoScheduler(io_loop=io_loop)
-
- @pytest.fixture
- def start_scheduler(self, request, io_loop, scheduler):
- io_loop.add_callback(scheduler.start)
- thread = Thread(target=io_loop.start)
- yield thread.start
-
- if scheduler.running:
- io_loop.add_callback(scheduler.shutdown)
- io_loop.add_callback(io_loop.stop)
- thread.join()
-
-
-class TestTwistedScheduler(SchedulerImplementationTestBase):
- @pytest.fixture
- def reactor(self):
- selectreactor = pytest.importorskip('twisted.internet.selectreactor')
- return selectreactor.SelectReactor()
-
- @pytest.fixture
- def scheduler(self, reactor):
- twisted = pytest.importorskip('apscheduler.schedulers.twisted')
- return twisted.TwistedScheduler(reactor=reactor)
-
- @pytest.fixture
- def start_scheduler(self, request, reactor, scheduler):
- reactor.callFromThread(scheduler.start)
- thread = Thread(target=reactor.run, args=(False,))
- yield thread.start
-
- if scheduler.running:
- reactor.callFromThread(scheduler.shutdown)
- reactor.callFromThread(reactor.stop)
- thread.join()
diff --git a/tests/test_util.py b/tests/test_util.py
index f80e17b..19ed988 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -5,10 +5,8 @@ from functools import partial
from types import ModuleType
import pytest
-import pytz
-
from apscheduler.util import (
- datetime_ceil, get_callable_name, obj_to_ref, ref_to_obj, maybe_ref, check_callable_args)
+ check_callable_args, datetime_ceil, get_callable_name, maybe_ref, obj_to_ref, ref_to_obj)
class DummyClass(object):
diff --git a/tests/triggers/test_calendarinterval.py b/tests/triggers/test_calendarinterval.py
index ac0d023..079d014 100644
--- a/tests/triggers/test_calendarinterval.py
+++ b/tests/triggers/test_calendarinterval.py
@@ -1,7 +1,6 @@
-from datetime import datetime, date
+from datetime import date, datetime
import pytest
-
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
diff --git a/tests/triggers/test_combining.py b/tests/triggers/test_combining.py
index b804102..73a5836 100644
--- a/tests/triggers/test_combining.py
+++ b/tests/triggers/test_combining.py
@@ -1,7 +1,6 @@
from datetime import datetime, timedelta
import pytest
-
from apscheduler.exceptions import MaxIterationsReached
from apscheduler.triggers.combining import AndTrigger, OrTrigger
from apscheduler.triggers.date import DateTrigger
diff --git a/tests/triggers/test_cron.py b/tests/triggers/test_cron.py
index c331ef6..8f26aa3 100644
--- a/tests/triggers/test_cron.py
+++ b/tests/triggers/test_cron.py
@@ -2,7 +2,6 @@ from datetime import datetime
import pytest
import pytz
-
from apscheduler.triggers.cron import CronTrigger
diff --git a/tests/triggers/test_interval.py b/tests/triggers/test_interval.py
index 07ca35d..6761777 100644
--- a/tests/triggers/test_interval.py
+++ b/tests/triggers/test_interval.py
@@ -1,7 +1,6 @@
from datetime import datetime, timedelta
import pytest
-
from apscheduler.triggers.interval import IntervalTrigger
diff --git a/tests/workers/test_local.py b/tests/workers/test_local.py
new file mode 100644
index 0000000..d72d559
--- /dev/null
+++ b/tests/workers/test_local.py
@@ -0,0 +1,101 @@
+from datetime import datetime
+
+import pytest
+from anyio import fail_after, sleep
+from apscheduler.abc import Job
+from apscheduler.events import JobAdded, JobDeadlineMissed, JobFailed, JobSuccessful, JobUpdated
+from apscheduler.workers.local import LocalExecutor
+
+pytestmark = pytest.mark.anyio
+
+
+@pytest.mark.parametrize('sync', [True, False], ids=['sync', 'async'])
+@pytest.mark.parametrize('fail', [False, True], ids=['success', 'fail'])
+async def test_run_job_nonscheduled_success(sync, fail):
+ def sync_func(*args, **kwargs):
+ nonlocal received_args, received_kwargs
+ received_args = args
+ received_kwargs = kwargs
+ if fail:
+ raise Exception('failing as requested')
+ else:
+ return 'success'
+
+ async def async_func(*args, **kwargs):
+ nonlocal received_args, received_kwargs
+ received_args = args
+ received_kwargs = kwargs
+ if fail:
+ raise Exception('failing as requested')
+ else:
+ return 'success'
+
+ received_args = received_kwargs = None
+ events = []
+ async with LocalExecutor() as worker:
+ await worker.subscribe(events.append)
+
+ job = Job('task_id', sync_func if sync else async_func, args=(1, 2), kwargs={'x': 'foo'})
+ await worker.submit_job(job)
+
+ async with fail_after(1):
+ while len(events) < 3:
+ await sleep(0)
+
+ assert received_args == (1, 2)
+ assert received_kwargs == {'x': 'foo'}
+
+ assert isinstance(events[0], JobAdded)
+ assert events[0].job_id == job.id
+ assert events[0].task_id == 'task_id'
+ assert events[0].schedule_id is None
+ assert events[0].scheduled_start_time is None
+
+ assert isinstance(events[1], JobUpdated)
+ assert events[1].job_id == job.id
+ assert events[1].task_id == 'task_id'
+ assert events[1].schedule_id is None
+ assert events[1].scheduled_start_time is None
+
+ assert events[2].job_id == job.id
+ assert events[2].task_id == 'task_id'
+ assert events[2].schedule_id is None
+ assert events[2].scheduled_start_time is None
+ if fail:
+ assert isinstance(events[2], JobFailed)
+ assert type(events[2].exception) is Exception
+ assert isinstance(events[2].formatted_traceback, str)
+ else:
+ assert isinstance(events[2], JobSuccessful)
+ assert events[2].return_value == 'success'
+
+
+async def test_run_deadline_missed():
+ def func():
+ pytest.fail('This function should never be run')
+
+ scheduled_start_time = datetime(2020, 9, 14)
+ events = []
+ async with LocalExecutor() as worker:
+ await worker.subscribe(events.append)
+
+ job = Job('task_id', func, args=(), kwargs={}, schedule_id='foo',
+ scheduled_start_time=scheduled_start_time,
+ start_deadline=datetime(2020, 9, 14, 1))
+ await worker.submit_job(job)
+
+ async with fail_after(1):
+ while len(events) < 2:
+ await sleep(0)
+
+ assert isinstance(events[0], JobAdded)
+ assert events[0].job_id == job.id
+ assert events[0].task_id == 'task_id'
+ assert events[0].schedule_id == 'foo'
+ assert events[0].scheduled_start_time == scheduled_start_time
+
+ assert isinstance(events[1], JobDeadlineMissed)
+ assert events[1].job_id == job.id
+ assert events[1].task_id == 'task_id'
+ assert events[1].schedule_id == 'foo'
+ assert events[1].scheduled_start_time == scheduled_start_time