summaryrefslogtreecommitdiff
path: root/taskflow/patterns
diff options
context:
space:
mode:
authorAnastasia Karpinska <akarpinska@griddynamics.com>2013-09-18 18:21:15 +0300
committerAnastasia Karpinska <akarpinska@griddynamics.com>2013-09-18 19:04:18 +0300
commit1623dbb01ed5f1f9e6e6c595a8993f3776f285ef (patch)
treeae7054d4241841ae2747045991e099ea7cad9355 /taskflow/patterns
parentb07ee63f78e9c200c3a117f148b78599f8b71628 (diff)
downloadtaskflow-1623dbb01ed5f1f9e6e6c595a8993f3776f285ef.tar.gz
Graph flow, sequential graph action
Change-Id: I07cc820aa2f37d0f9599f34efab07b28cf47ca48
Diffstat (limited to 'taskflow/patterns')
-rw-r--r--taskflow/patterns/graph_flow.py206
1 files changed, 91 insertions, 115 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 0cdcf04..40b6e68 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -16,132 +16,108 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
+import collections
from networkx.algorithms import dag
from networkx.classes import digraph
-from networkx import exception as g_exc
-from taskflow import decorators
from taskflow import exceptions as exc
-from taskflow.patterns import linear_flow
-from taskflow.utils import graph_utils
-from taskflow.utils import misc
+from taskflow import flow
-LOG = logging.getLogger(__name__)
+class Flow(flow.Flow):
+ """Graph flow pattern
-class Flow(linear_flow.Flow):
- """A extension of the linear flow which will run the associated tasks in
- a linear topological ordering (and reverse using the same linear
- topological order).
+ Nested flows will be executed according to their dependencies
+ that will be resolved using data tasks provide and require.
+
+ Note: Cyclic dependencies are not allowed.
"""
- def __init__(self, name, parents=None, uuid=None):
- super(Flow, self).__init__(name, parents, uuid)
+ def __init__(self, name, uuid=None):
+ super(Flow, self).__init__(name, uuid)
self._graph = digraph.DiGraph()
- @decorators.locked
- def add(self, task, infer=True):
- # Only insert the node to start, connect all the edges
- # together later after all nodes have been added since if we try
- # to infer the edges at this stage we likely will fail finding
- # dependencies from nodes that don't exist.
- r = misc.AOTRunner(task)
- self._graph.add_node(r, uuid=r.uuid, infer=infer)
- self._reset_internals()
- return r.uuid
-
- def _find_uuid(self, uuid):
- runner = None
- for r in self._graph.nodes_iter():
- if r.uuid == uuid:
- runner = r
- break
- return runner
+ def link(self, u, v):
+ if not self._graph.has_node(u):
+ raise ValueError('Item %s not found to link from' % (u))
+ if not self._graph.has_node(v):
+ raise ValueError('Item %s not found to link to' % (v))
+ self._graph.add_edge(u, v)
+
+ # Ensure that there is a valid topological ordering.
+ if not dag.is_directed_acyclic_graph(self._graph):
+ self._graph.remove_edge(u, v)
+ raise exc.DependencyFailure("No path through the items in the"
+ " graph produces an ordering that"
+ " will allow for correct dependency"
+ " resolution")
+
+ def add(self, *items):
+ """Adds a given task/tasks/flow/flows to this flow."""
+ requirements = collections.defaultdict(list)
+ provided = {}
+
+ def update_requirements(node):
+ for value in node.requires:
+ requirements[value].append(node)
+
+ for node in self:
+ update_requirements(node)
+ for value in node.provides:
+ provided[value] = node
- def __len__(self):
- return len(self._graph)
-
- @decorators.locked
- def add_dependency(self, provider_uuid, requirer_uuid):
- """Connects provider to requirer where provider will now be required
- to run before requirer does.
- """
- if provider_uuid == requirer_uuid:
- raise ValueError("Unable to link %s to itself" % provider_uuid)
- provider = self._find_uuid(provider_uuid)
- if not provider:
- raise ValueError("No provider found with uuid %s" % provider_uuid)
- requirer = self._find_uuid(requirer_uuid)
- if not requirer:
- raise ValueError("No requirer found with uuid %s" % requirer_uuid)
- self._add_dependency(provider, requirer, reason='manual')
- self._reset_internals()
-
- def _add_dependency(self, provider, requirer, reason):
- self._graph.add_edge(provider, requirer, reason=reason)
-
- def __str__(self):
- lines = ["GraphFlow: %s" % (self.name)]
- lines.append("%s" % (self.uuid))
- lines.append("%s" % (self._graph.number_of_nodes()))
- lines.append("%s" % (self._graph.number_of_edges()))
- lines.append("%s" % (len(self.parents)))
- lines.append("%s" % (self.state))
- return "; ".join(lines)
-
- def _reset_internals(self):
- super(Flow, self)._reset_internals()
- self._runners = []
-
- @decorators.locked
- def remove(self, uuid):
- runner = self._find_uuid(uuid)
- if not runner:
- raise ValueError("No uuid %s found" % (uuid))
- else:
- self._graph.remove_node(runner)
- self._reset_internals()
-
- def _ordering(self):
try:
- return iter(self._connect())
- except g_exc.NetworkXUnfeasible:
- raise exc.InvalidStateException("Unable to correctly determine "
- "the path through the provided "
- "flow which will satisfy the "
- "tasks needed inputs and outputs.")
-
- def _connect(self):
- """Connects the nodes & edges of the graph together by examining who
- the requirements of each node and finding another node that will
- create said dependency.
- """
- if len(self._graph) == 0:
- return []
- if self._connected:
- return self._runners
-
- # Clear out all automatically added edges since we want to do a fresh
- # connections. Leave the manually connected ones intact so that users
- # still retain the dependencies they established themselves.
- def discard_edge_func(u, v, e_data):
- if e_data and e_data.get('reason') != 'manual':
- return True
- return False
-
- # Link providers to requirers.
- graph_utils.connect(self._graph, discard_func=discard_edge_func)
-
- # Now figure out the order so that we can give the runners there
- # optional item providers as well as figure out the topological run
- # order.
- run_order = dag.topological_sort(self._graph)
- run_stack = []
- for r in run_order:
- r.runs_before = list(reversed(run_stack))
- run_stack.append(r)
- self._runners = run_order
- self._connected = True
- return run_order
+ for item in items:
+ self._graph.add_node(item)
+ update_requirements(item)
+ for value in item.provides:
+ if value in provided:
+ raise exc.DependencyFailure(
+ "%(item)s provides %(value)s but is already being"
+ " provided by %(flow)s and duplicate producers"
+ " are disallowed"
+ % dict(item=item.name,
+ flow=provided[value].name,
+ value=value))
+ provided[value] = item
+
+ for value in item.requires:
+ if value in provided:
+ self.link(provided[value], item)
+
+ for value in item.provides:
+ if value in requirements:
+ for node in requirements[value]:
+ self.link(item, node)
+
+ except Exception:
+ self._graph.remove_nodes_from(items)
+ raise
+
+ return self
+
+ def __len__(self):
+ return self._graph.number_of_nodes()
+
+ def __iter__(self):
+ for child in self._graph.nodes_iter():
+ yield child
+
+ @property
+ def provides(self):
+ provides = set()
+ for subflow in self:
+ provides.update(subflow.provides)
+ return provides
+
+ @property
+ def requires(self):
+ requires = set()
+ for subflow in self:
+ requires.update(subflow.requires)
+ return requires - self.provides
+
+ @property
+ def graph(self):
+ return self._graph