summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2014-06-15 21:20:44 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2014-06-16 12:48:00 +0300
commit5203bbda036096f634ffa1e10c26a8df2d1f2de7 (patch)
tree974bfb1b05a3cda085ddff3185b750036f1c600d
parent89ac8d637367ee62094433b17694162ea5118b17 (diff)
downloadapscheduler-5203bbda036096f634ffa1e10c26a8df2d1f2de7.tar.gz
Fixed infinite loop and index corruption in MemoryJobStore
-rw-r--r--apscheduler/jobstores/memory.py79
-rw-r--r--tests/test_jobstores.py39
2 files changed, 80 insertions, 38 deletions
diff --git a/apscheduler/jobstores/memory.py b/apscheduler/jobstores/memory.py
index c551f12..031a105 100644
--- a/apscheduler/jobstores/memory.py
+++ b/apscheduler/jobstores/memory.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
+from apscheduler.util import datetime_to_utc_timestamp
class MemoryJobStore(BaseJobStore):
@@ -8,58 +9,61 @@ class MemoryJobStore(BaseJobStore):
def __init__(self):
super(MemoryJobStore, self).__init__()
- self._jobs = [] # sorted by next_run_time
- self._jobs_index = {} # id -> job lookup table
+ self._jobs = [] # list of (job, timestamp), sorted by next_run_time and job id (ascending)
+ self._jobs_index = {} # id -> (job, timestamp) lookup table
def lookup_job(self, job_id):
- return self._jobs_index.get(job_id)
+ return self._jobs_index.get(job_id, (None, None))[0]
def get_due_jobs(self, now):
+ now_timestamp = datetime_to_utc_timestamp(now)
pending = []
- for job in self._jobs:
- if not job.next_run_time or job.next_run_time > now:
+ for job, timestamp in self._jobs:
+ if timestamp is None or timestamp > now_timestamp:
break
pending.append(job)
return pending
def get_next_run_time(self):
- return self._jobs[0].next_run_time if self._jobs else None
+ return self._jobs[0][0].next_run_time if self._jobs else None
def get_all_jobs(self):
- return list(self._jobs)
+ return [j[0] for j in self._jobs]
def add_job(self, job):
if job.id in self._jobs_index:
raise ConflictingIdError(job.id)
- index = self._bisect_job(job.next_run_time)
- self._jobs.insert(index, job)
- self._jobs_index[job.id] = job
+ timestamp = datetime_to_utc_timestamp(job.next_run_time)
+ index = self._get_job_index(timestamp, job.id)
+ self._jobs.insert(index, (job, timestamp))
+ self._jobs_index[job.id] = (job, timestamp)
def update_job(self, job):
- old_job = self.lookup_job(job.id)
+ old_job, old_timestamp = self._jobs_index.get(job.id, (None, None))
if old_job is None:
raise JobLookupError(job.id)
- old_index = self._get_job_index(old_job)
- self._jobs_index[old_job.id] = job
-
# If the next run time has not changed, simply replace the job in its present index.
# Otherwise, reinsert the job to the list to preserve the ordering.
- if old_job.next_run_time == job.next_run_time:
- self._jobs[old_index] = job
+ old_index = self._get_job_index(old_timestamp, old_job.id)
+ new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
+ if old_timestamp == new_timestamp:
+ self._jobs[old_index] = (job, new_timestamp)
else:
del self._jobs[old_index]
- index = self._bisect_job(job.next_run_time)
- self._jobs.insert(index, job)
+ new_index = self._get_job_index(new_timestamp, job.id)
+ self._jobs.insert(new_index, (job, new_timestamp))
+
+ self._jobs_index[old_job.id] = (job, new_timestamp)
def remove_job(self, job_id):
- job = self.lookup_job(job_id)
+ job, timestamp = self._jobs_index.get(job_id, (None, None))
if job is None:
raise JobLookupError(job_id)
- index = self._get_job_index(job)
+ index = self._get_job_index(timestamp, job_id)
del self._jobs[index]
del self._jobs_index[job.id]
@@ -70,23 +74,30 @@ class MemoryJobStore(BaseJobStore):
def shutdown(self):
self.remove_all_jobs()
- def _get_job_index(self, job):
- jobs = self._jobs
- index = self._bisect_job(job.next_run_time)
- end = len(self._jobs)
- while index < end:
- if jobs[index].id == job.id:
- return index
-
- def _bisect_job(self, run_time):
- # Adapted from the bisect module
- jobs = self._jobs
- lo, hi = 0, len(jobs)
+ def _get_job_index(self, timestamp, job_id):
+ """
+ Returns the index of the given job, or if it's not found, the index where the job should be inserted based on
+ the given timestamp.
+
+ :type timestamp: int
+ :type job_id: str
+ """
+
+ lo, hi = 0, len(self._jobs)
+ timestamp = float('inf') if timestamp is None else timestamp
while lo < hi:
mid = (lo + hi) // 2
- if run_time is None or (jobs[mid].next_run_time is not None and jobs[mid].next_run_time < run_time):
+ mid_job, mid_timestamp = self._jobs[mid]
+ mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp
+ if mid_timestamp > timestamp:
+ hi = mid
+ elif mid_timestamp < timestamp:
lo = mid + 1
- else:
+ elif mid_job.id > job_id:
hi = mid
+ elif mid_job.id < job_id:
+ lo = mid + 1
+ else:
+ return mid
return lo
diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py
index 64a0ed7..191d4fd 100644
--- a/tests/test_jobstores.py
+++ b/tests/test_jobstores.py
@@ -146,16 +146,47 @@ def test_update_job_next_runtime(jobstore, create_add_job, next_run_time, timezo
job1 = create_add_job(jobstore, dummy_job, datetime(2016, 5, 3))
create_add_job(jobstore, dummy_job2, datetime(2014, 2, 26))
job3 = create_add_job(jobstore, dummy_job3, datetime(2013, 8, 14))
- replacement = create_add_job(None, dummy_job, datetime.now(), id=job1.id)
- replacement.next_run_time = timezone.localize(next_run_time) if next_run_time else None
- jobstore.update_job(replacement)
+ job1.next_run_time = timezone.localize(next_run_time) if next_run_time else None
+ jobstore.update_job(job1)
if next_run_time:
- assert jobstore.get_next_run_time() == replacement.next_run_time
+ assert jobstore.get_next_run_time() == job1.next_run_time
else:
assert jobstore.get_next_run_time() == job3.next_run_time
+@pytest.mark.parametrize('next_run_time', [datetime(2013, 8, 13), None], ids=['earlier', 'null'])
+@pytest.mark.parametrize('index', [0, 1, 2], ids=['first', 'middle', 'last'])
+def test_update_job_clear_next_runtime(jobstore, create_add_job, next_run_time, timezone, index):
+ """
+ Tests that update_job() maintains the proper ordering of the jobs, even when their next run times are initially the
+ same.
+ """
+
+ jobs = [create_add_job(jobstore, dummy_job, datetime(2014, 2, 26), 'job%d' % i) for i in range(3)]
+ jobs[index].next_run_time = timezone.localize(next_run_time) if next_run_time else None
+ jobstore.update_job(jobs[index])
+ due_date = timezone.localize(datetime(2014, 2, 27))
+ due_jobs = jobstore.get_due_jobs(due_date)
+
+ assert len(due_jobs) == (3 if next_run_time else 2)
+ due_job_ids = [job.id for job in due_jobs]
+ if next_run_time:
+ if index == 0:
+ assert due_job_ids == ['job0', 'job1', 'job2']
+ elif index == 1:
+ assert due_job_ids == ['job1', 'job0', 'job2']
+ else:
+ assert due_job_ids == ['job2', 'job0', 'job1']
+ else:
+ if index == 0:
+ assert due_job_ids == ['job1', 'job2']
+ elif index == 1:
+ assert due_job_ids == ['job0', 'job2']
+ else:
+ assert due_job_ids == ['job0', 'job1']
+
+
def test_update_job_nonexistent_job(jobstore, create_add_job):
job = create_add_job(None, dummy_job, datetime(2016, 5, 3))
pytest.raises(JobLookupError, jobstore.update_job, job)