diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-04-02 11:44:04 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-04-02 11:44:04 +0300 |
commit | bb67d0899d235479b32450315a683b42af435f73 (patch) | |
tree | 152ee77666a1135dcf898123788169bac8463978 /tests | |
parent | bc8ae633568ca87befcf4e7b0b8cf71c22e5c3cb (diff) | |
download | apscheduler-bb67d0899d235479b32450315a683b42af435f73.tar.gz |
Added EVENT_JOB_SUBMITTED and EVENT_JOB_MAX_INSTANCES events
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_schedulers.py | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index ba17702..c2812c3 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -18,7 +18,7 @@ from apscheduler.events import ( EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_ALL_JOBS_REMOVED, EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, SchedulerEvent, - EVENT_JOB_ADDED, EVENT_JOB_EXECUTED) + EVENT_JOB_ADDED, EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES) from apscheduler.triggers.base import BaseTrigger from apscheduler.util import undefined @@ -767,6 +767,31 @@ Jobstore baz: def test_process_jobs_empty(self, scheduler): assert scheduler._process_jobs() is None + def test_job_submitted_event(self, scheduler, freeze_time): + events = [] + scheduler.add_job(lambda: None, run_date=freeze_time.get()) + scheduler.add_listener(events.append, EVENT_JOB_SUBMITTED) + scheduler.start() + scheduler._process_jobs() + + assert len(events) == 1 + assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] + + def test_job_max_instances_event(self, scheduler, freeze_time): + class MaxedOutExecutor(DummyExecutor): + def submit_job(self, job, run_times): + raise MaxInstancesReachedError(job) + + events = [] + scheduler.add_executor(MaxedOutExecutor(), 'maxed') + scheduler.add_job(lambda: None, run_date=freeze_time.get(), executor='maxed') + scheduler.add_listener(events.append, EVENT_JOB_MAX_INSTANCES) + scheduler.start() + scheduler._process_jobs() + + assert len(events) == 1 + assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)] + class TestProcessJobs(object): @pytest.fixture |