diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-09-04 12:47:26 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-09-05 19:26:36 -0700 |
| commit | 23dfff410587d0f137bc59e10d953396802455d7 (patch) | |
| tree | 234b2ec65ff455a57ca43a144300a491b29ea9e2 /taskflow/task.py | |
| parent | d6d4a937199a3662465f52a8cc289f0077a74f88 (diff) | |
| download | taskflow-23dfff410587d0f137bc59e10d953396802455d7.tar.gz | |
Engine, task, linear_flow unification
In order to move away from the existing flows having their
own implementation of running, start moving the existing
flows to be patterns that only structure tasks (and impose
constraints about how the group of tasks can run) in useful
ways.
Let the concept of running those patterns be handled by an
engine instead of being handled by the flow itself. This
will allow for varying engines to be able to run flows in
whichever way the engine chooses (as long as the constraints
set up by the flow are observed).
Currently threaded flow and graph flow are broken by this
commit, since they have not been converted to being a
structure of tasks + constraints. The existing engine has
not yet been modified to run those structures either, work
is underway to remediate this.
Part of: blueprint patterns-and-engines
Followup bugs that must be addressed:
Bug: 1221448
Bug: 1221505
Change-Id: I3a8b96179f336d1defe269728ebae0caa3d832d7
Diffstat (limited to 'taskflow/task.py')
| -rw-r--r-- | taskflow/task.py | 131 |
1 files changed, 82 insertions, 49 deletions
diff --git a/taskflow/task.py b/taskflow/task.py index 34be356..2173183 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -23,24 +23,79 @@ from taskflow.utils import misc from taskflow.utils import reflection +def _save_as_to_mapping(save_as): + """Convert save_as to mapping name => index + + Result should follow storage convention for mappings. + """ + if save_as is None: + return {} + if isinstance(save_as, basestring): + return {save_as: None} + elif isinstance(save_as, (tuple, list)): + return dict((key, num) for num, key in enumerate(save_as)) + raise TypeError('Task provides parameter ' + 'should be str or tuple/list, not %r' % save_as) + + +def _build_rebind_dict(args, rebind_args): + if rebind_args is None: + return {} + elif isinstance(rebind_args, (list, tuple)): + rebind = dict(zip(args, rebind_args)) + if len(args) < len(rebind_args): + rebind.update((a, a) for a in rebind_args[len(args):]) + return rebind + elif isinstance(rebind_args, dict): + return rebind_args + else: + raise TypeError('Invalid rebind value: %s' % rebind_args) + + +def _check_args_mapping(task_name, rebind, args, accepts_kwargs): + args = set(args) + rebind = set(rebind.keys()) + extra_args = rebind - args + missing_args = args - rebind + if not accepts_kwargs and extra_args: + raise ValueError('Extra arguments given to task %s: %s' + % (task_name, sorted(extra_args))) + if missing_args: + raise ValueError('Missing arguments for task %s: %s' + % (task_name, sorted(missing_args))) + + +def _build_arg_mapping(task_name, reqs, rebind_args, function, do_infer): + task_args = reflection.get_required_callable_args(function) + accepts_kwargs = reflection.accepts_kwargs(function) + result = {} + if reqs: + result.update((a, a) for a in reqs) + if do_infer: + result.update((a, a) for a in task_args) + result.update(_build_rebind_dict(task_args, rebind_args)) + _check_args_mapping(task_name, result, task_args, accepts_kwargs) + return result + + class BaseTask(object): """An abstraction that defines a potential piece of work that can be applied and can be reverted to undo the work as a single unit. """ __metaclass__ = abc.ABCMeta - def __init__(self, name): + def __init__(self, name, provides=None): self._name = name - # An *immutable* input 'resource' name set this task depends + # An *immutable* input 'resource' name mapping this task depends # on existing before this task can be applied. - self.requires = set() - # An *immutable* input 'resource' name set this task would like to - # depends on existing before this task can be applied (but does not - # strongly depend on existing). - self.optional = set() - # An *immutable* output 'resource' name set this task + # + # Format is input_name:arg_name + self.requires = {} + # An *immutable* output 'resource' name dict this task # produces that other tasks may depend on this task providing. - self.provides = set() + # + # Format is output index:arg_name + self.provides = _save_as_to_mapping(provides) # This identifies the version of the task to be ran which # can be useful in resuming older versions of tasks. Standard # major, minor version semantics apply. @@ -75,23 +130,18 @@ class Task(BaseTask): Adds following features to Task: - auto-generates name from type of self - - adds all execute argument names to task requiremets + - adds all execute argument names to task requirements """ - def __init__(self, name=None, requires_from_args=True): - """Initialize task instance - - :param name: task name, if None (the default) name will - be autogenerated - :param requires_from_args: if True (the default) execute - arguments names will be added to task requirements - """ + def __init__(self, name=None, provides=None, requires=None, + auto_extract=True, rebind=None): + """Initialize task instance""" if name is None: name = reflection.get_callable_name(self) - super(Task, self).__init__(name) - if requires_from_args: - f_args = reflection.get_required_callable_args(self.execute) - self.requires.update(a for a in f_args if a != 'context') + super(Task, self).__init__(name, + provides=provides) + self.requires = _build_arg_mapping(self.name, requires, rebind, + self.execute, auto_extract) class FunctorTask(BaseTask): @@ -100,36 +150,19 @@ class FunctorTask(BaseTask): Take any callable and make a task from it. """ - def __init__(self, execute, **kwargs): - """Initialize FunctorTask instance with given callable and kwargs - - :param execute: the callable - :param kwargs: reserved keywords (all optional) are - name: name of the task, default None (auto generate) - revert: the callable to revert, default None - version: version of the task, default Task's version 1.0 - optionals: optionals of the task, default () - provides: provides of the task, default () - requires: requires of the task, default () - auto_extract: auto extract execute's args and put it into - requires, default True - """ - name = kwargs.pop('name', None) + def __init__(self, execute, name=None, provides=None, + requires=None, auto_extract=True, rebind=None, revert=None, + version=None): + """Initialize FunctorTask instance with given callable and kwargs""" if name is None: name = reflection.get_callable_name(execute) - super(FunctorTask, self).__init__(name) + super(FunctorTask, self).__init__(name, provides=provides) self._execute = execute - self._revert = kwargs.pop('revert', None) - self.version = kwargs.pop('version', self.version) - self.optional.update(kwargs.pop('optional', ())) - self.provides.update(kwargs.pop('provides', ())) - self.requires.update(kwargs.pop('requires', ())) - if kwargs.pop('auto_extract', True): - f_args = reflection.get_required_callable_args(execute) - self.requires.update(a for a in f_args if a != 'context') - if kwargs: - raise TypeError('__init__() got an unexpected keyword argument %r' - % kwargs.keys[0]) + self._revert = revert + if version is not None: + self.version = version + self.requires = _build_arg_mapping(self.name, requires, rebind, + execute, auto_extract) def execute(self, *args, **kwargs): return self._execute(*args, **kwargs) |
