diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2014-04-01 05:12:12 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2014-04-01 06:11:00 +0300 |
commit | 3831f00b700fa312964a59ffafe7359f13e57185 (patch) | |
tree | c8f0f3aa57e5590a858c215ccdb38c8f850e2717 /apscheduler/schedulers | |
parent | d564658d2df03628999e8e26871e6fbf4a588b0b (diff) | |
download | apscheduler-3831f00b700fa312964a59ffafe7359f13e57185.tar.gz |
Changed the syntax for adding a new job
Added extensive validation for Job attributes
Diffstat (limited to 'apscheduler/schedulers')
-rw-r--r-- | apscheduler/schedulers/base.py | 151 |
1 files changed, 33 insertions, 118 deletions
diff --git a/apscheduler/schedulers/base.py b/apscheduler/schedulers/base.py index 3bc5ea8..2ddd9cc 100644 --- a/apscheduler/schedulers/base.py +++ b/apscheduler/schedulers/base.py @@ -3,11 +3,9 @@ from abc import ABCMeta, abstractmethod from threading import RLock from datetime import datetime, timedelta from logging import getLogger -from collections import Mapping, Iterable -from inspect import ismethod, isfunction +from uuid import uuid4 import sys -from pkg_resources import iter_entry_points from dateutil.tz import tzlocal import six @@ -28,7 +26,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): """Base class for all schedulers.""" _stopped = True - _plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers')) # # Public API @@ -164,8 +161,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): if callback == cb: del self._listeners[i] - def add_job(self, func, trigger, trigger_args=(), args=None, kwargs=None, id=None, name=None, - misfire_grace_time=None, coalesce=None, max_runs=None, max_instances=1, jobstore='default'): + def add_job(self, trigger, func, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None, + max_runs=None, max_instances=1, jobstore='default', **trigger_args): """ Adds the given job to the job list and notifies the scheduler thread. @@ -175,22 +172,22 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): The ``trigger`` argument can either be: - # the plugin name of the trigger (e.g. "cron"), in which case you should provide ``trigger_args`` as well - # an instance of the trigger + # the plugin name of the trigger (e.g. "cron"), in which case any extra keyword arguments to this method are + passed on to the trigger's constructor + # an instance of a trigger class :param trigger: trigger that determines when ``func`` is called - :param trigger_args: arguments given to the constructor of the trigger class :param func: callable (or a textual reference to one) to run at the given time :param args: list of positional arguments to call func with :param kwargs: dict of keyword arguments to call func with :param id: explicit identifier for the job (for modifying it later) :param name: textual description of the job - :param jobstore: alias of the job store to store the job in :param misfire_grace_time: seconds after the designated run time that the job is still allowed to be run :param coalesce: run once instead of many times if the scheduler determines that the job should be run more than once in succession :param max_runs: maximum number of times this job is allowed to be triggered :param max_instances: maximum number of concurrently running instances allowed for this job + :param jobstore: alias of the job store to store the job in :type id: str/unicode :type args: list/tuple :type jobstore: str/unicode @@ -201,51 +198,23 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :type max_instances: int :rtype: :class:`~apscheduler.job.JobHandle` """ - # Argument sanity checking - if args is not None and (not isinstance(args, Iterable) and not isinstance(args, str)): - raise TypeError('args must be an iterable') - if kwargs is not None and not isinstance(kwargs, Mapping): - raise TypeError('kwargs must be a dict-like object') - if misfire_grace_time is not None and misfire_grace_time <= 0: - raise ValueError('misfire_grace_time must be a positive value') - if max_runs is not None and max_runs <= 0: - raise ValueError('max_runs must be a positive value') - if max_instances <= 0: - raise ValueError('max_instances must be a positive value') - - # If trigger is a string, resolve it to a class, possibly by loading an entry point if necessary - if isinstance(trigger, str): - try: - trigger_cls = self._triggers[trigger] - except KeyError: - if trigger in self._plugins: - trigger_cls = self._triggers[trigger] = self._plugins[trigger].load() - if not callable(getattr(trigger_cls, 'get_next_fire_time')): - raise TypeError('The trigger entry point does not point to a trigger class') - else: - raise KeyError('No trigger by the name "%s" was found' % trigger) - - if isinstance(trigger_args, Mapping): - trigger = trigger_cls(self.trigger_defaults, **trigger_args) - elif isinstance(trigger_args, Iterable): - trigger = trigger_cls(self.trigger_defaults, *trigger_args) - else: - raise ValueError('trigger_args must either be a dict-like object or an iterable') - elif not callable(getattr(trigger, 'get_next_fire_time')): - raise TypeError('Expected a trigger instance, got %s instead' % trigger.__class__.__name__) - - # Replace with scheduler level defaults if values are missing - if misfire_grace_time is None: - misfire_grace_time = self.misfire_grace_time - if coalesce is None: - coalesce = self.coalesce - - args = tuple(args) if args is not None else () - kwargs = dict(kwargs) if kwargs is not None else {} - job = Job(trigger, func, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs, max_instances) - # Make sure the callable can handle the given arguments - self._check_callable_args(job.func, args, kwargs) + trigger_args.setdefault('timezone', self.timezone) + + job_kwargs = { + 'trigger': trigger, + 'trigger_args': trigger_args, + 'func': func, + 'args': tuple(args) if args is not None else (), + 'kwargs': dict(kwargs) if kwargs is not None else {}, + 'id': id, + 'name': name, + 'misfire_grace_time': misfire_grace_time if misfire_grace_time is not None else self.misfire_grace_time, + 'coalesce': coalesce if coalesce is not None else self.coalesce, + 'max_runs': max_runs, + 'max_instances': max_instances + } + job = Job(**job_kwargs) # Don't really add jobs to job stores before the scheduler is up and running if not self.running: @@ -256,13 +225,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): return JobHandle(self, jobstore, job) - def scheduled_job(self, trigger, trigger_args=(), args=None, kwargs=None, id=None, name=None, - misfire_grace_time=None, coalesce=None, max_runs=None, max_instances=1, jobstore='default'): + def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None, misfire_grace_time=None, coalesce=None, + max_runs=None, max_instances=1, jobstore='default', **trigger_args): """A decorator version of :meth:`add_job`.""" def inner(func): - self.add_job(func, trigger, trigger_args, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs, - max_instances, jobstore) + self.add_job(trigger, func, args, kwargs, id, misfire_grace_time, coalesce, name, max_runs, + max_instances, jobstore, **trigger_args) return func return inner @@ -274,28 +243,22 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param jobstore: alias of the job store """ - # Sanity check for the changes - for attr, value in six.iteritems(changes): - if attr not in Job.modifiable_attributes: - raise ValueError('Cannot modify Job attribute "%s"' % attr) - with self._jobstores_lock: # Check if the job is among the pending jobs for job, store in self._pending_jobs: if job.id == job_id: - for attr, value in six.iteritems(changes): - setattr(job, attr, value) - break + job.modify(changes) + return else: store = self._jobstores[jobstore] + job = store.lookup_job(changes.get('id', job_id)) + changes = job.validate_changes(changes) store.modify_job(job_id, changes) - store.lookup_job(changes.get('id', job_id)) self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_JOB_MODIFIED, jobstore, job_id)) - # Wake up the scheduler if the job's next run time has been changed - if changes.get('next_run_time') is not None: - self._wakeup() + # Wake up the scheduler since the job's next run time may have been changed + self._wakeup() def get_jobs(self, jobstore=None, pending=None): """ @@ -403,9 +366,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self.coalesce = asbool(config.pop('coalesce', True)) self.timezone = astimezone(config.pop('timezone', None)) or tzlocal() - # Set trigger defaults - self.trigger_defaults = {'timezone': self.timezone} - # Configure the thread pool if 'threadpool' in config: self._threadpool = maybe_ref(config['threadpool']) @@ -458,51 +418,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): if wakeup: self._wakeup() - @staticmethod - def _check_callable_args(func, args, kwargs): - """Ensures that the given callable can be called with the given arguments.""" - - if not isfunction(func) and not ismethod(func) and hasattr(func, '__call__'): - func = func.__call__ - argspec = getargspec(func) - argspec_args = argspec.args[1:] if ismethod(func) else argspec.args - varkw = getattr(argspec, 'varkw', None) or getattr(argspec, 'keywords', None) - kwargs_set = frozenset(kwargs) - mandatory_args = frozenset(argspec_args[:-len(argspec.defaults)] if argspec.defaults else argspec_args) - mandatory_args_matches = frozenset(argspec_args[:len(args)]) - mandatory_kwargs_matches = set(kwargs).intersection(mandatory_args) - kwonly_args = frozenset(getattr(argspec, 'kwonlyargs', [])) - kwonly_defaults = frozenset(getattr(argspec, 'kwonlydefaults', None) or ()) - - # Make sure there are no conflicts between args and kwargs - pos_kwargs_conflicts = mandatory_args_matches.intersection(mandatory_kwargs_matches) - if pos_kwargs_conflicts: - raise ValueError('The following arguments are supplied in both args and kwargs: %s' % - ', '.join(pos_kwargs_conflicts)) - - # Check that the number of positional arguments minus the number of matched kwargs matches the argspec - missing_args = mandatory_args - mandatory_args_matches.union(mandatory_kwargs_matches) - if missing_args: - raise ValueError('The following arguments are not supplied: %s' % ', '.join(missing_args)) - - # Check that the callable can accept the given number of positional arguments - if not argspec.varargs and len(args) > len(argspec_args): - raise ValueError('The list of positional arguments is longer than the target callable can handle ' - '(allowed: %d, given in args: %d)' % (len(argspec_args), len(args))) - - # Check that the callable can accept the given keyword arguments - if not varkw: - unmatched_kwargs = kwargs_set - frozenset(argspec_args).union(kwonly_args) - if unmatched_kwargs: - raise ValueError('The target callable does not accept the following keyword arguments: %s' % - ', '.join(unmatched_kwargs)) - - # Check that all keyword-only arguments have been supplied - unmatched_kwargs = kwonly_args - kwargs_set - kwonly_defaults - if unmatched_kwargs: - raise ValueError('The following keyword-only arguments have not been supplied in kwargs: %s' % - ', '.join(unmatched_kwargs)) - @abstractmethod def _wakeup(self): """Triggers :meth:`_process_jobs` to be run in an implementation specific manner.""" |