summaryrefslogtreecommitdiff
path: root/apscheduler/schedulers
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2014-04-01 05:12:12 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2014-04-01 06:11:00 +0300
commit3831f00b700fa312964a59ffafe7359f13e57185 (patch)
treec8f0f3aa57e5590a858c215ccdb38c8f850e2717 /apscheduler/schedulers
parentd564658d2df03628999e8e26871e6fbf4a588b0b (diff)
downloadapscheduler-3831f00b700fa312964a59ffafe7359f13e57185.tar.gz
Changed the syntax for adding a new job
Added extensive validation for Job attributes
Diffstat (limited to 'apscheduler/schedulers')
-rw-r--r--apscheduler/schedulers/base.py151
1 files changed, 33 insertions, 118 deletions
diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py
index 3bc5ea8..2ddd9cc 100644
--- a/apscheduler/schedulers/base.py
+++ b/apscheduler/schedulers/base.py
@@ -3,11 +3,9 @@ from abc import ABCMeta, abstractmethod
from threading import RLock
from datetime import datetime, timedelta
from logging import getLogger
-from collections import Mapping, Iterable
-from inspect import ismethod, isfunction
+from uuid import uuid4
import sys
-from pkg_resources import iter_entry_points
from dateutil.tz import tzlocal
import six
@@ -28,7 +26,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
"""Base class for all schedulers."""
_stopped = True
- _plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
#
# Public API
@@ -164,8 +161,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
if callback == cb:
del self._listeners[i]
- def add_job(self, func, trigger, trigger_args=(), args=None, kwargs=None, id=None, name=None,
- misfire_grace_time=None, coalesce=None, max_runs=None, max_instances=1, jobstore='default'):
+ def add_job(self, trigger, func, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None,
+ max_runs=None, max_instances=1, jobstore='default', **trigger_args):
"""
Adds the given job to the job list and notifies the scheduler thread.
@@ -175,22 +172,22 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
The ``trigger`` argument can either be:
- # the plugin name of the trigger (e.g. "cron"), in which case you should provide ``trigger_args`` as well
- # an instance of the trigger
+ # the plugin name of the trigger (e.g. "cron"), in which case any extra keyword arguments to this method are
+ passed on to the trigger's constructor
+ # an instance of a trigger class
:param trigger: trigger that determines when ``func`` is called
- :param trigger_args: arguments given to the constructor of the trigger class
:param func: callable (or a textual reference to one) to run at the given time
:param args: list of positional arguments to call func with
:param kwargs: dict of keyword arguments to call func with
:param id: explicit identifier for the job (for modifying it later)
:param name: textual description of the job
- :param jobstore: alias of the job store to store the job in
:param misfire_grace_time: seconds after the designated run time that the job is still allowed to be run
:param coalesce: run once instead of many times if the scheduler determines that the job should be run more than
once in succession
:param max_runs: maximum number of times this job is allowed to be triggered
:param max_instances: maximum number of concurrently running instances allowed for this job
+ :param jobstore: alias of the job store to store the job in
:type id: str/unicode
:type args: list/tuple
:type jobstore: str/unicode
@@ -201,51 +198,23 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:type max_instances: int
:rtype: :class:`~apscheduler.job.JobHandle`
"""
- # Argument sanity checking
- if args is not None and (not isinstance(args, Iterable) and not isinstance(args, str)):
- raise TypeError('args must be an iterable')
- if kwargs is not None and not isinstance(kwargs, Mapping):
- raise TypeError('kwargs must be a dict-like object')
- if misfire_grace_time is not None and misfire_grace_time <= 0:
- raise ValueError('misfire_grace_time must be a positive value')
- if max_runs is not None and max_runs <= 0:
- raise ValueError('max_runs must be a positive value')
- if max_instances <= 0:
- raise ValueError('max_instances must be a positive value')
-
- # If trigger is a string, resolve it to a class, possibly by loading an entry point if necessary
- if isinstance(trigger, str):
- try:
- trigger_cls = self._triggers[trigger]
- except KeyError:
- if trigger in self._plugins:
- trigger_cls = self._triggers[trigger] = self._plugins[trigger].load()
- if not callable(getattr(trigger_cls, 'get_next_fire_time')):
- raise TypeError('The trigger entry point does not point to a trigger class')
- else:
- raise KeyError('No trigger by the name "%s" was found' % trigger)
-
- if isinstance(trigger_args, Mapping):
- trigger = trigger_cls(self.trigger_defaults, **trigger_args)
- elif isinstance(trigger_args, Iterable):
- trigger = trigger_cls(self.trigger_defaults, *trigger_args)
- else:
- raise ValueError('trigger_args must either be a dict-like object or an iterable')
- elif not callable(getattr(trigger, 'get_next_fire_time')):
- raise TypeError('Expected a trigger instance, got %s instead' % trigger.__class__.__name__)
-
- # Replace with scheduler level defaults if values are missing
- if misfire_grace_time is None:
- misfire_grace_time = self.misfire_grace_time
- if coalesce is None:
- coalesce = self.coalesce
-
- args = tuple(args) if args is not None else ()
- kwargs = dict(kwargs) if kwargs is not None else {}
- job = Job(trigger, func, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs, max_instances)
- # Make sure the callable can handle the given arguments
- self._check_callable_args(job.func, args, kwargs)
+ trigger_args.setdefault('timezone', self.timezone)
+
+ job_kwargs = {
+ 'trigger': trigger,
+ 'trigger_args': trigger_args,
+ 'func': func,
+ 'args': tuple(args) if args is not None else (),
+ 'kwargs': dict(kwargs) if kwargs is not None else {},
+ 'id': id,
+ 'name': name,
+ 'misfire_grace_time': misfire_grace_time if misfire_grace_time is not None else self.misfire_grace_time,
+ 'coalesce': coalesce if coalesce is not None else self.coalesce,
+ 'max_runs': max_runs,
+ 'max_instances': max_instances
+ }
+ job = Job(**job_kwargs)
# Don't really add jobs to job stores before the scheduler is up and running
if not self.running:
@@ -256,13 +225,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
return JobHandle(self, jobstore, job)
- def scheduled_job(self, trigger, trigger_args=(), args=None, kwargs=None, id=None, name=None,
- misfire_grace_time=None, coalesce=None, max_runs=None, max_instances=1, jobstore='default'):
+ def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None,
+ max_runs=None, max_instances=1, jobstore='default', **trigger_args):
"""A decorator version of :meth:`add_job`."""
def inner(func):
- self.add_job(func, trigger, trigger_args, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs,
- max_instances, jobstore)
+ self.add_job(trigger, func, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs,
+ max_instances, jobstore, **trigger_args)
return func
return inner
@@ -274,28 +243,22 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:param jobstore: alias of the job store
"""
- # Sanity check for the changes
- for attr, value in six.iteritems(changes):
- if attr not in Job.modifiable_attributes:
- raise ValueError('Cannot modify Job attribute "%s"' % attr)
-
with self._jobstores_lock:
# Check if the job is among the pending jobs
for job, store in self._pending_jobs:
if job.id == job_id:
- for attr, value in six.iteritems(changes):
- setattr(job, attr, value)
- break
+ job.modify(changes)
+ return
else:
store = self._jobstores[jobstore]
+ job = store.lookup_job(changes.get('id', job_id))
+ changes = job.validate_changes(changes)
store.modify_job(job_id, changes)
- store.lookup_job(changes.get('id', job_id))
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_JOB_MODIFIED, jobstore, job_id))
- # Wake up the scheduler if the job's next run time has been changed
- if changes.get('next_run_time') is not None:
- self._wakeup()
+ # Wake up the scheduler since the job's next run time may have been changed
+ self._wakeup()
def get_jobs(self, jobstore=None, pending=None):
"""
@@ -403,9 +366,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self.coalesce = asbool(config.pop('coalesce', True))
self.timezone = astimezone(config.pop('timezone', None)) or tzlocal()
- # Set trigger defaults
- self.trigger_defaults = {'timezone': self.timezone}
-
# Configure the thread pool
if 'threadpool' in config:
self._threadpool = maybe_ref(config['threadpool'])
@@ -458,51 +418,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
if wakeup:
self._wakeup()
- @staticmethod
- def _check_callable_args(func, args, kwargs):
- """Ensures that the given callable can be called with the given arguments."""
-
- if not isfunction(func) and not ismethod(func) and hasattr(func, '__call__'):
- func = func.__call__
- argspec = getargspec(func)
- argspec_args = argspec.args[1:] if ismethod(func) else argspec.args
- varkw = getattr(argspec, 'varkw', None) or getattr(argspec, 'keywords', None)
- kwargs_set = frozenset(kwargs)
- mandatory_args = frozenset(argspec_args[:-len(argspec.defaults)] if argspec.defaults else argspec_args)
- mandatory_args_matches = frozenset(argspec_args[:len(args)])
- mandatory_kwargs_matches = set(kwargs).intersection(mandatory_args)
- kwonly_args = frozenset(getattr(argspec, 'kwonlyargs', []))
- kwonly_defaults = frozenset(getattr(argspec, 'kwonlydefaults', None) or ())
-
- # Make sure there are no conflicts between args and kwargs
- pos_kwargs_conflicts = mandatory_args_matches.intersection(mandatory_kwargs_matches)
- if pos_kwargs_conflicts:
- raise ValueError('The following arguments are supplied in both args and kwargs: %s' %
- ', '.join(pos_kwargs_conflicts))
-
- # Check that the number of positional arguments minus the number of matched kwargs matches the argspec
- missing_args = mandatory_args - mandatory_args_matches.union(mandatory_kwargs_matches)
- if missing_args:
- raise ValueError('The following arguments are not supplied: %s' % ', '.join(missing_args))
-
- # Check that the callable can accept the given number of positional arguments
- if not argspec.varargs and len(args) > len(argspec_args):
- raise ValueError('The list of positional arguments is longer than the target callable can handle '
- '(allowed: %d, given in args: %d)' % (len(argspec_args), len(args)))
-
- # Check that the callable can accept the given keyword arguments
- if not varkw:
- unmatched_kwargs = kwargs_set - frozenset(argspec_args).union(kwonly_args)
- if unmatched_kwargs:
- raise ValueError('The target callable does not accept the following keyword arguments: %s' %
- ', '.join(unmatched_kwargs))
-
- # Check that all keyword-only arguments have been supplied
- unmatched_kwargs = kwonly_args - kwargs_set - kwonly_defaults
- if unmatched_kwargs:
- raise ValueError('The following keyword-only arguments have not been supplied in kwargs: %s' %
- ', '.join(unmatched_kwargs))
-
@abstractmethod
def _wakeup(self):
"""Triggers :meth:`_process_jobs` to be run in an implementation specific manner."""