summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoragronholm <devnull@localhost>2011-02-06 09:08:43 +0200
committeragronholm <devnull@localhost>2011-02-06 09:08:43 +0200
commitdd15dda27fdf4525fa760ecc00b383c09a2e59da (patch)
tree0f4b84641780180519de0126fc1d35128b68c0fe
parentc6c6281628664dcace883f32ab65494f4811b39d (diff)
downloadapscheduler-dd15dda27fdf4525fa760ecc00b383c09a2e59da.tar.gz
Greatly improved the speed of scheduler tests by bypassing threading entirely
-rw-r--r--tests/apschedulertests/testscheduler.py214
1 files changed, 211 insertions, 3 deletions
diff --git a/tests/apschedulertests/testscheduler.py b/tests/apschedulertests/testscheduler.py
index c07f264..66c5e3c 100644
--- a/tests/apschedulertests/testscheduler.py
+++ b/tests/apschedulertests/testscheduler.py
@@ -1,10 +1,17 @@
-from datetime import datetime
+from datetime import datetime, timedelta
+from copy import copy
+import os
from nose.tools import eq_, raises
from apscheduler.jobstores.ram_store import RAMJobStore
-from apscheduler.scheduler import Scheduler
-from apscheduler.job import STATUS_OK, JobStatus, STATUS_ERROR
+from apscheduler.scheduler import Scheduler, SchedulerAlreadyRunningError
+from apscheduler.job import STATUS_OK, STATUS_ERROR, JobStatus
+
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
class TestOfflineScheduler(object):
@@ -57,3 +64,204 @@ class TestOfflineScheduler(object):
self.scheduler.remove_listener(cb)
self.scheduler._notify_listeners(status)
eq_(val[0], 1)
+
+
+class FakeThread(object):
+ def isAlive(self):
+ return True
+
+
+class FakeThreadPool(object):
+ def submit(self, func, *args, **kwargs):
+ func(*args, **kwargs)
+
+
+class DummyException(Exception):
+ pass
+
+
+class TestJobExecution(object):
+ def setUp(self):
+ self.scheduler = Scheduler(threadpool=FakeThreadPool())
+ self.scheduler.add_jobstore(RAMJobStore(), 'default')
+
+ # Make the scheduler think it's running
+ self.scheduler._thread = FakeThread()
+
+ @raises(TypeError)
+ def test_noncallable(self):
+ date = datetime.now() + timedelta(days=1)
+ self.scheduler.add_date_job('wontwork', date)
+
+ def test_job_name(self):
+ def my_job():
+ pass
+
+ job = self.scheduler.add_interval_job(my_job,
+ start_date=datetime(2010, 5, 19))
+ eq_(repr(job), '<Job (name=apschedulertests.testscheduler.my_job, '
+ 'trigger=<IntervalTrigger (interval=datetime.timedelta(0, 1), '
+ 'start_date=datetime.datetime(2010, 5, 19, 0, 0))>)>')
+
+ def test_schedule_object(self):
+ # Tests that any callable object is accepted (and not just functions)
+ class A:
+ def __init__(self):
+ self.val = 0
+ def __call__(self):
+ self.val += 1
+
+ a = A()
+ job = self.scheduler.add_interval_job(a, seconds=1)
+ self.scheduler._process_jobs(job.next_run_time)
+ self.scheduler._process_jobs(job.next_run_time)
+ eq_(a.val, 2)
+
+ def test_unschedule_job(self):
+ def increment(vals):
+ vals[0] += 1
+
+ vals = [0]
+ job = self.scheduler.add_cron_job(increment, args=[vals])
+ self.scheduler._process_jobs(job.next_run_time)
+ eq_(vals[0], 1)
+ self.scheduler.unschedule_job(job)
+ self.scheduler._process_jobs(job.next_run_time)
+ eq_(vals[0], 1)
+
+ def test_job_finished(self):
+ def increment():
+ vals[0] += 1
+
+ vals = [0]
+ job = self.scheduler.add_interval_job(increment, max_runs=1)
+ self.scheduler._process_jobs(job.next_run_time)
+ eq_(vals, [1])
+ assert job not in self.scheduler.get_jobs()
+
+ @raises(DummyException)
+ def test_job_exception(self):
+ def failure():
+ raise DummyException
+
+ start_date = datetime(9999, 1, 1)
+ job = self.scheduler.add_date_job(failure, start_date)
+ job.func()
+
+ def test_interval(self):
+ def increment(amount):
+ vals[0] += amount
+ vals[1] += 1
+
+ vals = [0, 0]
+ job = self.scheduler.add_interval_job(increment, seconds=1, args=[2])
+ self.scheduler._process_jobs(job.next_run_time)
+ self.scheduler._process_jobs(job.next_run_time)
+ eq_(vals, [4, 2])
+
+ def test_interval_schedule(self):
+ @self.scheduler.interval_schedule(seconds=1)
+ def increment():
+ vals[0] += 1
+
+ vals = [0]
+ start = increment.job.next_run_time
+ self.scheduler._process_jobs(start)
+ self.scheduler._process_jobs(start + timedelta(seconds=1))
+ eq_(vals, [2])
+
+ def test_cron(self):
+ def increment(amount):
+ vals[0] += amount
+ vals[1] += 1
+
+ vals = [0, 0]
+ job = self.scheduler.add_cron_job(increment, args=[3])
+ start = job.next_run_time
+ self.scheduler._process_jobs(start)
+ eq_(vals, [3, 1])
+ self.scheduler._process_jobs(start + timedelta(seconds=1))
+ eq_(vals, [6, 2])
+ self.scheduler._process_jobs(start + timedelta(seconds=2))
+ eq_(vals, [9, 3])
+
+ def test_cron_schedule(self):
+ @self.scheduler.cron_schedule()
+ def increment():
+ vals[0] += 1
+
+ vals = [0]
+ start = increment.job.next_run_time
+ self.scheduler._process_jobs(start)
+ self.scheduler._process_jobs(start + timedelta(seconds=1))
+ eq_(vals[0], 2)
+
+ def test_date(self):
+ def append_val(value):
+ vals.append(value)
+
+ vals = []
+ date = datetime.now() + timedelta(seconds=1)
+ self.scheduler.add_date_job(append_val, date, kwargs={'value': 'test'})
+ self.scheduler._process_jobs(date)
+ eq_(vals, ['test'])
+
+ def test_print_jobs(self):
+ out = StringIO()
+ self.scheduler.print_jobs(out)
+ expected = 'Jobstore default:%s'\
+ ' No scheduled jobs' % os.linesep
+ eq_(out.getvalue(), expected)
+
+ self.scheduler.add_date_job(copy, datetime(2200, 5, 19))
+ out = StringIO()
+ self.scheduler.print_jobs(out)
+ expected = 'Jobstore default:%s '\
+ 'copy.copy (trigger: date[2200-05-19 00:00:00], '\
+ 'next run at: 2200-05-19 00:00:00)' % os.linesep
+ eq_(out.getvalue(), expected)
+
+ def test_jobstore(self):
+ self.scheduler.add_jobstore(RAMJobStore(), 'dummy')
+ job = self.scheduler.add_date_job(lambda: None, datetime(2200, 7, 24),
+ jobstore='dummy')
+ eq_(self.scheduler.get_jobs(), [job])
+ self.scheduler.remove_jobstore('dummy')
+ eq_(self.scheduler.get_jobs(), [])
+
+ def test_job_next_run_time(self):
+ # Tests against bug #5
+ def job_1():
+ vars[0] += 1
+
+ vars = [0]
+ job = self.scheduler.add_interval_job(job_1, seconds=1,
+ misfire_grace_time=3)
+ start = job.next_run_time
+ self.scheduler._process_jobs(start)
+ eq_(vars, [1])
+ self.scheduler._process_jobs(start)
+ eq_(vars, [1])
+ self.scheduler._process_jobs(start + timedelta(seconds=1))
+ eq_(vars, [2])
+
+
+class TestRunningScheduler(object):
+ def setup(self):
+ self.scheduler = Scheduler()
+ self.scheduler.start()
+
+ def teardown(self):
+ if self.scheduler.running:
+ self.scheduler.shutdown()
+
+ def test_shutdown_timeout(self):
+ self.scheduler.shutdown(3)
+
+ @raises(SchedulerAlreadyRunningError)
+ def test_scheduler_double_start(self):
+ self.scheduler.start()
+
+ def test_scheduler_double_shutdown(self):
+ self.scheduler.shutdown(1)
+ self.scheduler.shutdown()