summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2016-04-02 11:44:04 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2016-04-02 11:44:04 +0300
commitbb67d0899d235479b32450315a683b42af435f73 (patch)
tree152ee77666a1135dcf898123788169bac8463978 /tests
parentbc8ae633568ca87befcf4e7b0b8cf71c22e5c3cb (diff)
downloadapscheduler-bb67d0899d235479b32450315a683b42af435f73.tar.gz
Added EVENT_JOB_SUBMITTED and EVENT_JOB_MAX_INSTANCES events
Diffstat (limited to 'tests')
-rw-r--r--tests/test_schedulers.py27
1 files changed, 26 insertions, 1 deletions
diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py
index ba17702..c2812c3 100644
--- a/tests/test_schedulers.py
+++ b/tests/test_schedulers.py
@@ -18,7 +18,7 @@ from apscheduler.events import (
EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED,
EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_ALL_JOBS_REMOVED, EVENT_EXECUTOR_ADDED,
EVENT_EXECUTOR_REMOVED, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, SchedulerEvent,
- EVENT_JOB_ADDED, EVENT_JOB_EXECUTED)
+ EVENT_JOB_ADDED, EVENT_JOB_EXECUTED, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES)
from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import undefined
@@ -767,6 +767,31 @@ Jobstore baz:
def test_process_jobs_empty(self, scheduler):
assert scheduler._process_jobs() is None
+ def test_job_submitted_event(self, scheduler, freeze_time):
+ events = []
+ scheduler.add_job(lambda: None, run_date=freeze_time.get())
+ scheduler.add_listener(events.append, EVENT_JOB_SUBMITTED)
+ scheduler.start()
+ scheduler._process_jobs()
+
+ assert len(events) == 1
+ assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)]
+
+ def test_job_max_instances_event(self, scheduler, freeze_time):
+ class MaxedOutExecutor(DummyExecutor):
+ def submit_job(self, job, run_times):
+ raise MaxInstancesReachedError(job)
+
+ events = []
+ scheduler.add_executor(MaxedOutExecutor(), 'maxed')
+ scheduler.add_job(lambda: None, run_date=freeze_time.get(), executor='maxed')
+ scheduler.add_listener(events.append, EVENT_JOB_MAX_INSTANCES)
+ scheduler.start()
+ scheduler._process_jobs()
+
+ assert len(events) == 1
+ assert events[0].scheduled_run_times == [freeze_time.get(scheduler.timezone)]
+
class TestProcessJobs(object):
@pytest.fixture