diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-07-18 12:58:18 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-09-08 20:55:45 +0000 |
| commit | d6ef68762e847373be0584820fa0557fcbd5003f (patch) | |
| tree | 4473103af0af35fb38017603b28e00316b484ce3 /taskflow/patterns/graph_flow.py | |
| parent | 76641d86b89cdba23ac49d8c65011467a098f6dc (diff) | |
| download | taskflow-d6ef68762e847373be0584820fa0557fcbd5003f.tar.gz | |
Relax the graph flow symbol constraints
In order to make it possible to have a symbol
tree we need to relax the constraints that are
being imposed by the graph flow.
Part of blueprint taskflow-improved-scoping
Change-Id: I2e14de2131c3ba4e3e4eb3108477583d0f02dae2
Diffstat (limited to 'taskflow/patterns/graph_flow.py')
| -rw-r--r-- | taskflow/patterns/graph_flow.py | 156 |
1 files changed, 97 insertions, 59 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 658e051..f07e743 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -21,6 +21,22 @@ from taskflow import flow from taskflow.types import graph as gr +def _unsatisfied_requires(node, graph, *additional_provided): + """Extracts the unsatisified symbol requirements of a single node.""" + requires = set(node.requires) + if not requires: + return requires + for provided in additional_provided: + requires = requires - provided + if not requires: + return requires + for pred in graph.bfs_predecessors_iter(node): + requires = requires - pred.provides + if not requires: + return requires + return requires + + class Flow(flow.Flow): """Graph flow pattern. @@ -80,33 +96,59 @@ class Flow(flow.Flow): if not graph.is_directed_acyclic(): raise exc.DependencyFailure("No path through the items in the" " graph produces an ordering that" - " will allow for correct dependency" - " resolution") - self._graph = graph - self._graph.freeze() - - def add(self, *items): - """Adds a given task/tasks/flow/flows to this flow.""" + " will allow for logical" + " edge traversal") + self._graph = graph.freeze() + + def add(self, *items, **kwargs): + """Adds a given task/tasks/flow/flows to this flow. + + :param items: items to add to the flow + :param kwargs: keyword arguments, the two keyword arguments + currently processed are: + + * ``resolve_requires`` a boolean that when true (the + default) implies that when items are added their + symbol requirements will be matched to existing items + and links will be automatically made to those + providers. If multiple possible providers exist + then a AmbiguousDependency exception will be raised. + * ``resolve_existing``, a boolean that when true (the + default) implies that on addition of a new item that + existing items will have their requirements scanned + for symbols that this newly added item can provide. + If a match is found a link is automatically created + from the newly added item to the requiree. + """ items = [i for i in items if not self._graph.has_node(i)] if not items: return self - requirements = collections.defaultdict(list) - provided = {} + # This syntax will *hopefully* be better in future versions of python. + # + # See: http://legacy.python.org/dev/peps/pep-3102/ (python 3.0+) + resolve_requires = bool(kwargs.get('resolve_requires', True)) + resolve_existing = bool(kwargs.get('resolve_existing', True)) - def update_requirements(node): - for value in node.requires: - requirements[value].append(node) + # Figure out what the existing nodes *still* require and what they + # provide so we can do this lookup later when inferring. + required = collections.defaultdict(list) + provided = collections.defaultdict(list) - for node in self: - update_requirements(node) - for value in node.provides: - provided[value] = node - - if self.retry: - update_requirements(self.retry) - provided.update(dict((k, self.retry) - for k in self.retry.provides)) + retry_provides = set() + if self._retry is not None: + for value in self._retry.requires: + required[value].append(self._retry) + for value in self._retry.provides: + retry_provides.add(value) + provided[value].append(self._retry) + + for item in self._graph.nodes_iter(): + for value in _unsatisfied_requires(item, self._graph, + retry_provides): + required[value].append(item) + for value in item.provides: + provided[value].append(item) # NOTE(harlowja): Add items and edges to a temporary copy of the # underlying graph and only if that is successful added to do we then @@ -114,37 +156,41 @@ class Flow(flow.Flow): tmp_graph = gr.DiGraph(self._graph) for item in items: tmp_graph.add_node(item) - update_requirements(item) - for value in item.provides: - if value in provided: - raise exc.DependencyFailure( - "%(item)s provides %(value)s but is already being" - " provided by %(flow)s and duplicate producers" - " are disallowed" - % dict(item=item.name, - flow=provided[value].name, - value=value)) - if self.retry and value in self.retry.requires: - raise exc.DependencyFailure( - "Flows retry controller %(retry)s requires %(value)s " - "but item %(item)s being added to the flow produces " - "that item, this creates a cyclic dependency and is " - "disallowed" - % dict(item=item.name, - retry=self.retry.name, - value=value)) - provided[value] = item - - for value in item.requires: - if value in provided: - self._link(provided[value], item, - graph=tmp_graph, reason=value) + + # Try to find a valid provider. + if resolve_requires: + for value in _unsatisfied_requires(item, tmp_graph, + retry_provides): + if value in provided: + providers = provided[value] + if len(providers) > 1: + provider_names = [n.name for n in providers] + raise exc.AmbiguousDependency( + "Resolution error detected when" + " adding %(item)s, multiple" + " providers %(providers)s found for" + " required symbol '%(value)s'" + % dict(item=item.name, + providers=sorted(provider_names), + value=value)) + else: + self._link(providers[0], item, + graph=tmp_graph, reason=value) + else: + required[value].append(item) for value in item.provides: - if value in requirements: - for node in requirements[value]: - self._link(item, node, - graph=tmp_graph, reason=value) + provided[value].append(item) + + # See if what we provide fulfills any existing requiree. + if resolve_existing: + for value in item.provides: + if value in required: + for requiree in list(required[value]): + if requiree is not item: + self._link(item, requiree, + graph=tmp_graph, reason=value) + required[value].remove(requiree) self._swap(tmp_graph) return self @@ -177,15 +223,7 @@ class Flow(flow.Flow): retry_provides.update(self._retry.provides) g = self._get_subgraph() for item in g.nodes_iter(): - item_requires = item.requires - retry_provides - # Now scan predecessors to see if they provide what we want. - if item_requires: - for pred_item in g.bfs_predecessors_iter(item): - item_requires = item_requires - pred_item.provides - if not item_requires: - break - if item_requires: - requires.update(item_requires) + requires.update(_unsatisfied_requires(item, g, retry_provides)) return frozenset(requires) |
