diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-03-02 15:00:41 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-03-11 18:12:06 +0000 |
| commit | 67f0f5146431523ca65a28b3fb66477ff2f490d0 (patch) | |
| tree | 4d10bce3bc674a83ef22bfe546b9f396e900e0ad /taskflow/atom.py | |
| parent | 847d87db6a24641301884c4874bb31587f40dba5 (diff) | |
| download | taskflow-67f0f5146431523ca65a28b3fb66477ff2f490d0.tar.gz | |
Use ordered[set/dict] to retain ordering
Instead of using always using a set/dict which do not retain
use a ordered set and a ordered dict for requires, optional,
and provides and rebind mappings types so that the ordering
of these containers is maintained later when they are used.
These ordering can be useful depending on the atom type (such
as in a map and reduce tasks).
Partial-Bug: 1357117
Change-Id: I365d11bbba4aa221bc36ca15441acecf199b4d56
Diffstat (limited to 'taskflow/atom.py')
| -rw-r--r-- | taskflow/atom.py | 142 |
1 files changed, 82 insertions, 60 deletions
diff --git a/taskflow/atom.py b/taskflow/atom.py index 2ade20e..ebc5bad 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -16,14 +16,28 @@ # under the License. import abc +import collections +import itertools + +try: + from collections import OrderedDict # noqa +except ImportError: + from ordereddict import OrderedDict # noqa from oslo_utils import reflection import six +from six.moves import zip as compat_zip from taskflow import exceptions +from taskflow.types import sets from taskflow.utils import misc +# Helper types tuples... +_sequence_types = (list, tuple, collections.Sequence) +_set_types = (set, collections.Set) + + def _save_as_to_mapping(save_as): """Convert save_as to mapping name => index. @@ -33,25 +47,26 @@ def _save_as_to_mapping(save_as): # outside of code so that it's more easily understandable, since what an # atom returns is pretty crucial for other later operations. if save_as is None: - return {} + return OrderedDict() if isinstance(save_as, six.string_types): # NOTE(harlowja): this means that your atom will only return one item # instead of a dictionary-like object or a indexable object (like a # list or tuple). - return {save_as: None} - elif isinstance(save_as, (tuple, list)): + return OrderedDict([(save_as, None)]) + elif isinstance(save_as, _sequence_types): # NOTE(harlowja): this means that your atom will return a indexable # object, like a list or tuple and the results can be mapped by index # to that tuple/list that is returned for others to use. - return dict((key, num) for num, key in enumerate(save_as)) - elif isinstance(save_as, set): + return OrderedDict((key, num) for num, key in enumerate(save_as)) + elif isinstance(save_as, _set_types): # NOTE(harlowja): in the case where a set is given we will not be - # able to determine the numeric ordering in a reliable way (since it is - # a unordered set) so the only way for us to easily map the result of - # the atom will be via the key itself. - return dict((key, key) for key in save_as) - raise TypeError('Atom provides parameter ' - 'should be str, set or tuple/list, not %r' % save_as) + # able to determine the numeric ordering in a reliable way (since it + # may be an unordered set) so the only way for us to easily map the + # result of the atom will be via the key itself. + return OrderedDict((key, key) for key in save_as) + else: + raise TypeError('Atom provides parameter ' + 'should be str, set or tuple/list, not %r' % save_as) def _build_rebind_dict(args, rebind_args): @@ -62,9 +77,9 @@ def _build_rebind_dict(args, rebind_args): new name onto the required name). """ if rebind_args is None: - return {} + return OrderedDict() elif isinstance(rebind_args, (list, tuple)): - rebind = dict(zip(args, rebind_args)) + rebind = OrderedDict(compat_zip(args, rebind_args)) if len(args) < len(rebind_args): rebind.update((a, a) for a in rebind_args[len(args):]) return rebind @@ -85,11 +100,11 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, extra arguments (where applicable). """ - # build a list of required arguments based on function signature + # Build a list of required arguments based on function signature. req_args = reflection.get_callable_args(function, required_only=True) all_args = reflection.get_callable_args(function, required_only=False) - # remove arguments that are part of ignore_list + # Remove arguments that are part of ignore list. if ignore_list: for arg in ignore_list: if arg in req_args: @@ -97,39 +112,45 @@ def _build_arg_mapping(atom_name, reqs, rebind_args, function, do_infer, else: ignore_list = [] - required = {} - # add reqs to required mappings + # Build the required names. + required = OrderedDict() + + # Add required arguments to required mappings if inference is enabled. + if do_infer: + required.update((a, a) for a in req_args) + + # Add additional manually provided requirements to required mappings. if reqs: if isinstance(reqs, six.string_types): required.update({reqs: reqs}) else: required.update((a, a) for a in reqs) - # add req_args to required mappings if do_infer is set - if do_infer: - required.update((a, a) for a in req_args) - - # update required mappings based on rebind_args + # Update required mappings values based on rebinding of arguments names. required.update(_build_rebind_dict(req_args, rebind_args)) + # Determine if there are optional arguments that we may or may not take. if do_infer: - opt_args = set(all_args) - set(required) - set(ignore_list) - optional = dict((a, a) for a in opt_args) + opt_args = sets.OrderedSet(all_args) + opt_args = opt_args - set(itertools.chain(six.iterkeys(required), + iter(ignore_list))) + optional = OrderedDict((a, a) for a in opt_args) else: - optional = {} + optional = OrderedDict() + # Check if we are given some extra arguments that we aren't able to accept. if not reflection.accepts_kwargs(function): - extra_args = set(required) - set(all_args) + extra_args = sets.OrderedSet(six.iterkeys(required)) + extra_args -= all_args if extra_args: - extra_args_str = ', '.join(sorted(extra_args)) raise ValueError('Extra arguments given to atom %s: %s' - % (atom_name, extra_args_str)) + % (atom_name, list(extra_args))) # NOTE(imelnikov): don't use set to preserve order in error message missing_args = [arg for arg in req_args if arg not in required] if missing_args: raise ValueError('Missing arguments for atom %s: %s' - % (atom_name, ' ,'.join(missing_args))) + % (atom_name, missing_args)) return required, optional @@ -161,52 +182,53 @@ class Atom(object): with this atom. It can be useful in resuming older versions of atoms. Standard major, minor versioning concepts should apply. - :ivar save_as: An *immutable* output ``resource`` name dictionary this atom - produces that other atoms may depend on this atom providing. - The format is output index (or key when a dictionary - is returned from the execute method) to stored argument - name. - :ivar rebind: An *immutable* input ``resource`` mapping dictionary that - can be used to alter the inputs given to this atom. It is - typically used for mapping a prior atoms output into + :ivar save_as: An *immutable* output ``resource`` name + :py:class:`.OrderedDict` this atom produces that other + atoms may depend on this atom providing. The format is + output index (or key when a dictionary is returned from + the execute method) to stored argument name. + :ivar rebind: An *immutable* input ``resource`` :py:class:`.OrderedDict` + that can be used to alter the inputs given to this atom. It + is typically used for mapping a prior atoms output into the names that this atom expects (in a way this is like remapping a namespace of another atom into the namespace of this atom). :ivar inject: See parameter ``inject``. :ivar name: See parameter ``name``. - :ivar requires: An *immutable* set of inputs this atom requires to - function. - :ivar optional: An *immutable* set of inputs that are optional for this - atom to function. - :ivar provides: An *immutable* set of outputs this atom produces. + :ivar requires: A :py:class:`~taskflow.types.sets.OrderedSet` of inputs + this atom requires to function. + :ivar optional: A :py:class:`~taskflow.types.sets.OrderedSet` of inputs + that are optional for this atom to function. + :ivar provides: A :py:class:`~taskflow.types.sets.OrderedSet` of outputs + this atom produces. """ def __init__(self, name=None, provides=None, inject=None): self.name = name - self.save_as = _save_as_to_mapping(provides) self.version = (1, 0) self.inject = inject - self.requires = frozenset() - self.optional = frozenset() - self.provides = frozenset(self.save_as) - self.rebind = {} + self.save_as = _save_as_to_mapping(provides) + self.requires = sets.OrderedSet() + self.optional = sets.OrderedSet() + self.provides = sets.OrderedSet(self.save_as) + self.rebind = OrderedDict() def _build_arg_mapping(self, executor, requires=None, rebind=None, auto_extract=True, ignore_list=None): - req_arg, opt_arg = _build_arg_mapping(self.name, requires, rebind, - executor, auto_extract, - ignore_list) - self.rebind.clear() - if opt_arg: - self.rebind.update(opt_arg) - if req_arg: - self.rebind.update(req_arg) - self.requires = frozenset(req_arg.values()) - self.optional = frozenset(opt_arg.values()) + required, optional = _build_arg_mapping(self.name, requires, rebind, + executor, auto_extract, + ignore_list=ignore_list) + rebind = OrderedDict() + for (arg_name, bound_name) in itertools.chain(six.iteritems(required), + six.iteritems(optional)): + rebind.setdefault(arg_name, bound_name) + self.rebind = rebind + self.requires = sets.OrderedSet(six.itervalues(required)) + self.optional = sets.OrderedSet(six.itervalues(optional)) if self.inject: - inject_set = set(six.iterkeys(self.inject)) - self.requires -= inject_set - self.optional -= inject_set + inject_keys = frozenset(six.iterkeys(self.inject)) + self.requires -= inject_keys + self.optional -= inject_keys out_of_order = self.provides.intersection(self.requires) if out_of_order: raise exceptions.DependencyFailure( |
