summaryrefslogtreecommitdiff
path: root/taskflow/patterns/graph_flow.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-07-18 12:58:18 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-08 20:55:45 +0000
commitd6ef68762e847373be0584820fa0557fcbd5003f (patch)
tree4473103af0af35fb38017603b28e00316b484ce3 /taskflow/patterns/graph_flow.py
parent76641d86b89cdba23ac49d8c65011467a098f6dc (diff)
downloadtaskflow-d6ef68762e847373be0584820fa0557fcbd5003f.tar.gz
Relax the graph flow symbol constraints
In order to make it possible to have a symbol tree we need to relax the constraints that are being imposed by the graph flow. Part of blueprint taskflow-improved-scoping Change-Id: I2e14de2131c3ba4e3e4eb3108477583d0f02dae2
Diffstat (limited to 'taskflow/patterns/graph_flow.py')
-rw-r--r--taskflow/patterns/graph_flow.py156
1 files changed, 97 insertions, 59 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 658e051..f07e743 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -21,6 +21,22 @@ from taskflow import flow
from taskflow.types import graph as gr
+def _unsatisfied_requires(node, graph, *additional_provided):
+ """Extracts the unsatisified symbol requirements of a single node."""
+ requires = set(node.requires)
+ if not requires:
+ return requires
+ for provided in additional_provided:
+ requires = requires - provided
+ if not requires:
+ return requires
+ for pred in graph.bfs_predecessors_iter(node):
+ requires = requires - pred.provides
+ if not requires:
+ return requires
+ return requires
+
+
class Flow(flow.Flow):
"""Graph flow pattern.
@@ -80,33 +96,59 @@ class Flow(flow.Flow):
if not graph.is_directed_acyclic():
raise exc.DependencyFailure("No path through the items in the"
" graph produces an ordering that"
- " will allow for correct dependency"
- " resolution")
- self._graph = graph
- self._graph.freeze()
-
- def add(self, *items):
- """Adds a given task/tasks/flow/flows to this flow."""
+ " will allow for logical"
+ " edge traversal")
+ self._graph = graph.freeze()
+
+ def add(self, *items, **kwargs):
+ """Adds a given task/tasks/flow/flows to this flow.
+
+ :param items: items to add to the flow
+ :param kwargs: keyword arguments, the two keyword arguments
+ currently processed are:
+
+ * ``resolve_requires`` a boolean that when true (the
+ default) implies that when items are added their
+ symbol requirements will be matched to existing items
+ and links will be automatically made to those
+ providers. If multiple possible providers exist
+ then a AmbiguousDependency exception will be raised.
+ * ``resolve_existing``, a boolean that when true (the
+ default) implies that on addition of a new item that
+ existing items will have their requirements scanned
+ for symbols that this newly added item can provide.
+ If a match is found a link is automatically created
+ from the newly added item to the requiree.
+ """
items = [i for i in items if not self._graph.has_node(i)]
if not items:
return self
- requirements = collections.defaultdict(list)
- provided = {}
+ # This syntax will *hopefully* be better in future versions of python.
+ #
+ # See: http://legacy.python.org/dev/peps/pep-3102/ (python 3.0+)
+ resolve_requires = bool(kwargs.get('resolve_requires', True))
+ resolve_existing = bool(kwargs.get('resolve_existing', True))
- def update_requirements(node):
- for value in node.requires:
- requirements[value].append(node)
+ # Figure out what the existing nodes *still* require and what they
+ # provide so we can do this lookup later when inferring.
+ required = collections.defaultdict(list)
+ provided = collections.defaultdict(list)
- for node in self:
- update_requirements(node)
- for value in node.provides:
- provided[value] = node
-
- if self.retry:
- update_requirements(self.retry)
- provided.update(dict((k, self.retry)
- for k in self.retry.provides))
+ retry_provides = set()
+ if self._retry is not None:
+ for value in self._retry.requires:
+ required[value].append(self._retry)
+ for value in self._retry.provides:
+ retry_provides.add(value)
+ provided[value].append(self._retry)
+
+ for item in self._graph.nodes_iter():
+ for value in _unsatisfied_requires(item, self._graph,
+ retry_provides):
+ required[value].append(item)
+ for value in item.provides:
+ provided[value].append(item)
# NOTE(harlowja): Add items and edges to a temporary copy of the
# underlying graph and only if that is successful added to do we then
@@ -114,37 +156,41 @@ class Flow(flow.Flow):
tmp_graph = gr.DiGraph(self._graph)
for item in items:
tmp_graph.add_node(item)
- update_requirements(item)
- for value in item.provides:
- if value in provided:
- raise exc.DependencyFailure(
- "%(item)s provides %(value)s but is already being"
- " provided by %(flow)s and duplicate producers"
- " are disallowed"
- % dict(item=item.name,
- flow=provided[value].name,
- value=value))
- if self.retry and value in self.retry.requires:
- raise exc.DependencyFailure(
- "Flows retry controller %(retry)s requires %(value)s "
- "but item %(item)s being added to the flow produces "
- "that item, this creates a cyclic dependency and is "
- "disallowed"
- % dict(item=item.name,
- retry=self.retry.name,
- value=value))
- provided[value] = item
-
- for value in item.requires:
- if value in provided:
- self._link(provided[value], item,
- graph=tmp_graph, reason=value)
+
+ # Try to find a valid provider.
+ if resolve_requires:
+ for value in _unsatisfied_requires(item, tmp_graph,
+ retry_provides):
+ if value in provided:
+ providers = provided[value]
+ if len(providers) > 1:
+ provider_names = [n.name for n in providers]
+ raise exc.AmbiguousDependency(
+ "Resolution error detected when"
+ " adding %(item)s, multiple"
+ " providers %(providers)s found for"
+ " required symbol '%(value)s'"
+ % dict(item=item.name,
+ providers=sorted(provider_names),
+ value=value))
+ else:
+ self._link(providers[0], item,
+ graph=tmp_graph, reason=value)
+ else:
+ required[value].append(item)
for value in item.provides:
- if value in requirements:
- for node in requirements[value]:
- self._link(item, node,
- graph=tmp_graph, reason=value)
+ provided[value].append(item)
+
+ # See if what we provide fulfills any existing requiree.
+ if resolve_existing:
+ for value in item.provides:
+ if value in required:
+ for requiree in list(required[value]):
+ if requiree is not item:
+ self._link(item, requiree,
+ graph=tmp_graph, reason=value)
+ required[value].remove(requiree)
self._swap(tmp_graph)
return self
@@ -177,15 +223,7 @@ class Flow(flow.Flow):
retry_provides.update(self._retry.provides)
g = self._get_subgraph()
for item in g.nodes_iter():
- item_requires = item.requires - retry_provides
- # Now scan predecessors to see if they provide what we want.
- if item_requires:
- for pred_item in g.bfs_predecessors_iter(item):
- item_requires = item_requires - pred_item.provides
- if not item_requires:
- break
- if item_requires:
- requires.update(item_requires)
+ requires.update(_unsatisfied_requires(item, g, retry_provides))
return frozenset(requires)