diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-11-05 21:58:06 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-11-05 22:14:54 +0200 |
commit | 6a5f3e6786116202710fa98db1efa991c62de971 (patch) | |
tree | 0eec4a321766dc37fc7c464bd6f12376c76401f4 | |
parent | d4bce351a4dc15a5eb4be1887999426d005ba2ac (diff) | |
download | apscheduler-6a5f3e6786116202710fa98db1efa991c62de971.tar.gz |
Fixed scheduler loop breaking if the job store fails (fixes #109)
-rw-r--r-- | apscheduler/schedulers/base.py | 23 | ||||
-rw-r--r-- | docs/versionhistory.rst | 3 |
2 files changed, 24 insertions, 2 deletions
diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py index 72e3c39..9326909 100644 --- a/apscheduler/schedulers/base.py +++ b/apscheduler/schedulers/base.py @@ -3,7 +3,7 @@ from __future__ import print_function from abc import ABCMeta, abstractmethod from collections import MutableMapping from threading import RLock -from datetime import datetime +from datetime import datetime, timedelta from logging import getLogger import warnings import sys @@ -43,6 +43,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to apscheduler.scheduler) :param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone) + :param int|float jobstore_retry_interval: the minimum number of seconds to wait between + retries in the scheduler's main loop if the job store raises an exception when getting + the list of due jobs :param dict job_defaults: default values for newly added jobs :param dict jobstores: a dictionary of job store alias -> job store instance or configuration dict @@ -684,6 +687,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): # Set general options self._logger = maybe_ref(config.pop('logger', None)) or getLogger('apscheduler.scheduler') self.timezone = astimezone(config.pop('timezone', None)) or get_localzone() + self.jobstore_retry_interval = float(config.pop('jobstore_retry_interval', 10)) # Set the job defaults job_defaults = config.get('job_defaults', {}) @@ -909,6 +913,9 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for the next round. + If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least + ``jobstore_retry_interval`` seconds. + """ if self.state == STATE_PAUSED: self._logger.debug('Scheduler is paused -- not processing jobs') @@ -921,7 +928,19 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): with self._jobstores_lock: for jobstore_alias, jobstore in six.iteritems(self._jobstores): - for job in jobstore.get_due_jobs(now): + try: + due_jobs = jobstore.get_due_jobs(now) + except Exception as e: + # Schedule a wakeup at least in jobstore_retry_interval seconds + self._logger.warning('Error getting due jobs from job store %r: %s', + jobstore_alias, e) + retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) + if not next_wakeup_time or next_wakeup_time > retry_wakeup_time: + next_wakeup_time = retry_wakeup_time + + continue + + for job in due_jobs: # Look up the job's executor try: executor = self._lookup_executor(job.executor) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 842495d..703198d 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -16,6 +16,9 @@ APScheduler, see the :doc:`migration section <migration>`. * Added the ability to execute coroutine functions with asyncio/trollius and Twisted +* Fixed job store failure (``get_due_jobs()``) causing the scheduler main loop to exit (it now + waits a configurable number of seconds before retrying) + * Improved import logic in ``ref_to_obj()`` to avoid errors in cases where traversing the path with ``getattr()`` would not work (thanks to Jarek Glowacki for the patch) |