diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2014-06-15 21:20:44 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2014-06-16 12:48:00 +0300 |
commit | 5203bbda036096f634ffa1e10c26a8df2d1f2de7 (patch) | |
tree | 974bfb1b05a3cda085ddff3185b750036f1c600d | |
parent | 89ac8d637367ee62094433b17694162ea5118b17 (diff) | |
download | apscheduler-5203bbda036096f634ffa1e10c26a8df2d1f2de7.tar.gz |
Fixed infinite loop and index corruption in MemoryJobStore
-rw-r--r-- | apscheduler/jobstores/memory.py | 79 | ||||
-rw-r--r-- | tests/test_jobstores.py | 39 |
2 files changed, 80 insertions, 38 deletions
diff --git a/apscheduler/jobstores/memory.py b/apscheduler/jobstores/memory.py index c551f12..031a105 100644 --- a/apscheduler/jobstores/memory.py +++ b/apscheduler/jobstores/memory.py @@ -1,6 +1,7 @@ from __future__ import absolute_import from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError +from apscheduler.util import datetime_to_utc_timestamp class MemoryJobStore(BaseJobStore): @@ -8,58 +9,61 @@ class MemoryJobStore(BaseJobStore): def __init__(self): super(MemoryJobStore, self).__init__() - self._jobs = [] # sorted by next_run_time - self._jobs_index = {} # id -> job lookup table + self._jobs = [] # list of (job, timestamp), sorted by next_run_time and job id (ascending) + self._jobs_index = {} # id -> (job, timestamp) lookup table def lookup_job(self, job_id): - return self._jobs_index.get(job_id) + return self._jobs_index.get(job_id, (None, None))[0] def get_due_jobs(self, now): + now_timestamp = datetime_to_utc_timestamp(now) pending = [] - for job in self._jobs: - if not job.next_run_time or job.next_run_time > now: + for job, timestamp in self._jobs: + if timestamp is None or timestamp > now_timestamp: break pending.append(job) return pending def get_next_run_time(self): - return self._jobs[0].next_run_time if self._jobs else None + return self._jobs[0][0].next_run_time if self._jobs else None def get_all_jobs(self): - return list(self._jobs) + return [j[0] for j in self._jobs] def add_job(self, job): if job.id in self._jobs_index: raise ConflictingIdError(job.id) - index = self._bisect_job(job.next_run_time) - self._jobs.insert(index, job) - self._jobs_index[job.id] = job + timestamp = datetime_to_utc_timestamp(job.next_run_time) + index = self._get_job_index(timestamp, job.id) + self._jobs.insert(index, (job, timestamp)) + self._jobs_index[job.id] = (job, timestamp) def update_job(self, job): - old_job = self.lookup_job(job.id) + old_job, old_timestamp = self._jobs_index.get(job.id, (None, None)) if old_job is None: raise JobLookupError(job.id) - old_index = self._get_job_index(old_job) - self._jobs_index[old_job.id] = job - # If the next run time has not changed, simply replace the job in its present index. # Otherwise, reinsert the job to the list to preserve the ordering. - if old_job.next_run_time == job.next_run_time: - self._jobs[old_index] = job + old_index = self._get_job_index(old_timestamp, old_job.id) + new_timestamp = datetime_to_utc_timestamp(job.next_run_time) + if old_timestamp == new_timestamp: + self._jobs[old_index] = (job, new_timestamp) else: del self._jobs[old_index] - index = self._bisect_job(job.next_run_time) - self._jobs.insert(index, job) + new_index = self._get_job_index(new_timestamp, job.id) + self._jobs.insert(new_index, (job, new_timestamp)) + + self._jobs_index[old_job.id] = (job, new_timestamp) def remove_job(self, job_id): - job = self.lookup_job(job_id) + job, timestamp = self._jobs_index.get(job_id, (None, None)) if job is None: raise JobLookupError(job_id) - index = self._get_job_index(job) + index = self._get_job_index(timestamp, job_id) del self._jobs[index] del self._jobs_index[job.id] @@ -70,23 +74,30 @@ class MemoryJobStore(BaseJobStore): def shutdown(self): self.remove_all_jobs() - def _get_job_index(self, job): - jobs = self._jobs - index = self._bisect_job(job.next_run_time) - end = len(self._jobs) - while index < end: - if jobs[index].id == job.id: - return index - - def _bisect_job(self, run_time): - # Adapted from the bisect module - jobs = self._jobs - lo, hi = 0, len(jobs) + def _get_job_index(self, timestamp, job_id): + """ + Returns the index of the given job, or if it's not found, the index where the job should be inserted based on + the given timestamp. + + :type timestamp: int + :type job_id: str + """ + + lo, hi = 0, len(self._jobs) + timestamp = float('inf') if timestamp is None else timestamp while lo < hi: mid = (lo + hi) // 2 - if run_time is None or (jobs[mid].next_run_time is not None and jobs[mid].next_run_time < run_time): + mid_job, mid_timestamp = self._jobs[mid] + mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp + if mid_timestamp > timestamp: + hi = mid + elif mid_timestamp < timestamp: lo = mid + 1 - else: + elif mid_job.id > job_id: hi = mid + elif mid_job.id < job_id: + lo = mid + 1 + else: + return mid return lo diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py index 64a0ed7..191d4fd 100644 --- a/tests/test_jobstores.py +++ b/tests/test_jobstores.py @@ -146,16 +146,47 @@ def test_update_job_next_runtime(jobstore, create_add_job, next_run_time, timezo job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3)) create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26)) job3 = create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14)) - replacement = create_add_job(None, dummy_job, datetime.now(), id=job1.id) - replacement.next_run_time = timezone.localize(next_run_time) if next_run_time else None - jobstore.update_job(replacement) + job1.next_run_time = timezone.localize(next_run_time) if next_run_time else None + jobstore.update_job(job1) if next_run_time: - assert jobstore.get_next_run_time() == replacement.next_run_time + assert jobstore.get_next_run_time() == job1.next_run_time else: assert jobstore.get_next_run_time() == job3.next_run_time +@pytest.mark.parametrize('next_run_time', [datetime(2013, 8, 13), None], ids=['earlier', 'null']) +@pytest.mark.parametrize('index', [0, 1, 2], ids=['first', 'middle', 'last']) +def test_update_job_clear_next_runtime(jobstore, create_add_job, next_run_time, timezone, index): + """ + Tests that update_job() maintains the proper ordering of the jobs, even when their next run times are initially the + same. + """ + + jobs = [create_add_job(jobstore, dummy_job, datetime(2014, 2, 26), 'job%d' % i) for i in range(3)] + jobs[index].next_run_time = timezone.localize(next_run_time) if next_run_time else None + jobstore.update_job(jobs[index]) + due_date = timezone.localize(datetime(2014, 2, 27)) + due_jobs = jobstore.get_due_jobs(due_date) + + assert len(due_jobs) == (3 if next_run_time else 2) + due_job_ids = [job.id for job in due_jobs] + if next_run_time: + if index == 0: + assert due_job_ids == ['job0', 'job1', 'job2'] + elif index == 1: + assert due_job_ids == ['job1', 'job0', 'job2'] + else: + assert due_job_ids == ['job2', 'job0', 'job1'] + else: + if index == 0: + assert due_job_ids == ['job1', 'job2'] + elif index == 1: + assert due_job_ids == ['job0', 'job2'] + else: + assert due_job_ids == ['job0', 'job1'] + + def test_update_job_nonexistent_job(jobstore, create_add_job): job = create_add_job(None, dummy_job, datetime(2016, 5, 3)) pytest.raises(JobLookupError, jobstore.update_job, job) |