summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2016-11-05 21:58:06 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2016-11-05 22:14:54 +0200
commit6a5f3e6786116202710fa98db1efa991c62de971 (patch)
tree0eec4a321766dc37fc7c464bd6f12376c76401f4
parentd4bce351a4dc15a5eb4be1887999426d005ba2ac (diff)
downloadapscheduler-6a5f3e6786116202710fa98db1efa991c62de971.tar.gz
Fixed scheduler loop breaking if the job store fails (fixes #109)
-rw-r--r--apscheduler/schedulers/base.py23
-rw-r--r--docs/versionhistory.rst3
2 files changed, 24 insertions, 2 deletions
diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py
index 72e3c39..9326909 100644
--- a/apscheduler/schedulers/base.py
+++ b/apscheduler/schedulers/base.py
@@ -3,7 +3,7 @@ from __future__ import print_function
from abc import ABCMeta, abstractmethod
from collections import MutableMapping
from threading import RLock
-from datetime import datetime
+from datetime import datetime, timedelta
from logging import getLogger
import warnings
import sys
@@ -43,6 +43,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to
apscheduler.scheduler)
:param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone)
+ :param int|float jobstore_retry_interval: the minimum number of seconds to wait between
+ retries in the scheduler's main loop if the job store raises an exception when getting
+ the list of due jobs
:param dict job_defaults: default values for newly added jobs
:param dict jobstores: a dictionary of job store alias -> job store instance or configuration
dict
@@ -684,6 +687,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
# Set general options
self._logger = maybe_ref(config.pop('logger', None)) or getLogger('apscheduler.scheduler')
self.timezone = astimezone(config.pop('timezone', None)) or get_localzone()
+ self.jobstore_retry_interval = float(config.pop('jobstore_retry_interval', 10))
# Set the job defaults
job_defaults = config.get('job_defaults', {})
@@ -909,6 +913,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
to wait for the next round.
+ If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
+ ``jobstore_retry_interval`` seconds.
+
"""
if self.state == STATE_PAUSED:
self._logger.debug('Scheduler is paused -- not processing jobs')
@@ -921,7 +928,19 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
with self._jobstores_lock:
for jobstore_alias, jobstore in six.iteritems(self._jobstores):
- for job in jobstore.get_due_jobs(now):
+ try:
+ due_jobs = jobstore.get_due_jobs(now)
+ except Exception as e:
+ # Schedule a wakeup at least in jobstore_retry_interval seconds
+ self._logger.warning('Error getting due jobs from job store %r: %s',
+ jobstore_alias, e)
+ retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
+ if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
+ next_wakeup_time = retry_wakeup_time
+
+ continue
+
+ for job in due_jobs:
# Look up the job's executor
try:
executor = self._lookup_executor(job.executor)
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 842495d..703198d 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -16,6 +16,9 @@ APScheduler, see the :doc:`migration section <migration>`.
* Added the ability to execute coroutine functions with asyncio/trollius and Twisted
+* Fixed job store failure (``get_due_jobs()``) causing the scheduler main loop to exit (it now
+ waits a configurable number of seconds before retrying)
+
* Improved import logic in ``ref_to_obj()`` to avoid errors in cases where traversing the path with
``getattr()`` would not work (thanks to Jarek Glowacki for the patch)