summaryrefslogtreecommitdiff
path: root/taskflow/task.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2013-09-04 12:47:26 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2013-09-05 19:26:36 -0700
commit23dfff410587d0f137bc59e10d953396802455d7 (patch)
tree234b2ec65ff455a57ca43a144300a491b29ea9e2 /taskflow/task.py
parentd6d4a937199a3662465f52a8cc289f0077a74f88 (diff)
downloadtaskflow-23dfff410587d0f137bc59e10d953396802455d7.tar.gz
Engine, task, linear_flow unification
In order to move away from the existing flows having their own implementation of running, start moving the existing flows to be patterns that only structure tasks (and impose constraints about how the group of tasks can run) in useful ways. Let the concept of running those patterns be handled by an engine instead of being handled by the flow itself. This will allow for varying engines to be able to run flows in whichever way the engine chooses (as long as the constraints set up by the flow are observed). Currently threaded flow and graph flow are broken by this commit, since they have not been converted to being a structure of tasks + constraints. The existing engine has not yet been modified to run those structures either, work is underway to remediate this. Part of: blueprint patterns-and-engines Followup bugs that must be addressed: Bug: 1221448 Bug: 1221505 Change-Id: I3a8b96179f336d1defe269728ebae0caa3d832d7
Diffstat (limited to 'taskflow/task.py')
-rw-r--r--taskflow/task.py131
1 files changed, 82 insertions, 49 deletions
diff --git a/taskflow/task.py b/taskflow/task.py
index 34be356..2173183 100644
--- a/taskflow/task.py
+++ b/taskflow/task.py
@@ -23,24 +23,79 @@ from taskflow.utils import misc
from taskflow.utils import reflection
+def _save_as_to_mapping(save_as):
+ """Convert save_as to mapping name => index
+
+ Result should follow storage convention for mappings.
+ """
+ if save_as is None:
+ return {}
+ if isinstance(save_as, basestring):
+ return {save_as: None}
+ elif isinstance(save_as, (tuple, list)):
+ return dict((key, num) for num, key in enumerate(save_as))
+ raise TypeError('Task provides parameter '
+ 'should be str or tuple/list, not %r' % save_as)
+
+
+def _build_rebind_dict(args, rebind_args):
+ if rebind_args is None:
+ return {}
+ elif isinstance(rebind_args, (list, tuple)):
+ rebind = dict(zip(args, rebind_args))
+ if len(args) < len(rebind_args):
+ rebind.update((a, a) for a in rebind_args[len(args):])
+ return rebind
+ elif isinstance(rebind_args, dict):
+ return rebind_args
+ else:
+ raise TypeError('Invalid rebind value: %s' % rebind_args)
+
+
+def _check_args_mapping(task_name, rebind, args, accepts_kwargs):
+ args = set(args)
+ rebind = set(rebind.keys())
+ extra_args = rebind - args
+ missing_args = args - rebind
+ if not accepts_kwargs and extra_args:
+ raise ValueError('Extra arguments given to task %s: %s'
+ % (task_name, sorted(extra_args)))
+ if missing_args:
+ raise ValueError('Missing arguments for task %s: %s'
+ % (task_name, sorted(missing_args)))
+
+
+def _build_arg_mapping(task_name, reqs, rebind_args, function, do_infer):
+ task_args = reflection.get_required_callable_args(function)
+ accepts_kwargs = reflection.accepts_kwargs(function)
+ result = {}
+ if reqs:
+ result.update((a, a) for a in reqs)
+ if do_infer:
+ result.update((a, a) for a in task_args)
+ result.update(_build_rebind_dict(task_args, rebind_args))
+ _check_args_mapping(task_name, result, task_args, accepts_kwargs)
+ return result
+
+
class BaseTask(object):
"""An abstraction that defines a potential piece of work that can be
applied and can be reverted to undo the work as a single unit.
"""
__metaclass__ = abc.ABCMeta
- def __init__(self, name):
+ def __init__(self, name, provides=None):
self._name = name
- # An *immutable* input 'resource' name set this task depends
+ # An *immutable* input 'resource' name mapping this task depends
# on existing before this task can be applied.
- self.requires = set()
- # An *immutable* input 'resource' name set this task would like to
- # depends on existing before this task can be applied (but does not
- # strongly depend on existing).
- self.optional = set()
- # An *immutable* output 'resource' name set this task
+ #
+ # Format is input_name:arg_name
+ self.requires = {}
+ # An *immutable* output 'resource' name dict this task
# produces that other tasks may depend on this task providing.
- self.provides = set()
+ #
+ # Format is output index:arg_name
+ self.provides = _save_as_to_mapping(provides)
# This identifies the version of the task to be ran which
# can be useful in resuming older versions of tasks. Standard
# major, minor version semantics apply.
@@ -75,23 +130,18 @@ class Task(BaseTask):
Adds following features to Task:
- auto-generates name from type of self
- - adds all execute argument names to task requiremets
+ - adds all execute argument names to task requirements
"""
- def __init__(self, name=None, requires_from_args=True):
- """Initialize task instance
-
- :param name: task name, if None (the default) name will
- be autogenerated
- :param requires_from_args: if True (the default) execute
- arguments names will be added to task requirements
- """
+ def __init__(self, name=None, provides=None, requires=None,
+ auto_extract=True, rebind=None):
+ """Initialize task instance"""
if name is None:
name = reflection.get_callable_name(self)
- super(Task, self).__init__(name)
- if requires_from_args:
- f_args = reflection.get_required_callable_args(self.execute)
- self.requires.update(a for a in f_args if a != 'context')
+ super(Task, self).__init__(name,
+ provides=provides)
+ self.requires = _build_arg_mapping(self.name, requires, rebind,
+ self.execute, auto_extract)
class FunctorTask(BaseTask):
@@ -100,36 +150,19 @@ class FunctorTask(BaseTask):
Take any callable and make a task from it.
"""
- def __init__(self, execute, **kwargs):
- """Initialize FunctorTask instance with given callable and kwargs
-
- :param execute: the callable
- :param kwargs: reserved keywords (all optional) are
- name: name of the task, default None (auto generate)
- revert: the callable to revert, default None
- version: version of the task, default Task's version 1.0
- optionals: optionals of the task, default ()
- provides: provides of the task, default ()
- requires: requires of the task, default ()
- auto_extract: auto extract execute's args and put it into
- requires, default True
- """
- name = kwargs.pop('name', None)
+ def __init__(self, execute, name=None, provides=None,
+ requires=None, auto_extract=True, rebind=None, revert=None,
+ version=None):
+ """Initialize FunctorTask instance with given callable and kwargs"""
if name is None:
name = reflection.get_callable_name(execute)
- super(FunctorTask, self).__init__(name)
+ super(FunctorTask, self).__init__(name, provides=provides)
self._execute = execute
- self._revert = kwargs.pop('revert', None)
- self.version = kwargs.pop('version', self.version)
- self.optional.update(kwargs.pop('optional', ()))
- self.provides.update(kwargs.pop('provides', ()))
- self.requires.update(kwargs.pop('requires', ()))
- if kwargs.pop('auto_extract', True):
- f_args = reflection.get_required_callable_args(execute)
- self.requires.update(a for a in f_args if a != 'context')
- if kwargs:
- raise TypeError('__init__() got an unexpected keyword argument %r'
- % kwargs.keys[0])
+ self._revert = revert
+ if version is not None:
+ self.version = version
+ self.requires = _build_arg_mapping(self.name, requires, rebind,
+ execute, auto_extract)
def execute(self, *args, **kwargs):
return self._execute(*args, **kwargs)