diff options
| author | Anastasia Karpinska <akarpinska@griddynamics.com> | 2013-09-18 18:21:15 +0300 |
|---|---|---|
| committer | Anastasia Karpinska <akarpinska@griddynamics.com> | 2013-09-18 19:04:18 +0300 |
| commit | 1623dbb01ed5f1f9e6e6c595a8993f3776f285ef (patch) | |
| tree | ae7054d4241841ae2747045991e099ea7cad9355 /taskflow/patterns | |
| parent | b07ee63f78e9c200c3a117f148b78599f8b71628 (diff) | |
| download | taskflow-1623dbb01ed5f1f9e6e6c595a8993f3776f285ef.tar.gz | |
Graph flow, sequential graph action
Change-Id: I07cc820aa2f37d0f9599f34efab07b28cf47ca48
Diffstat (limited to 'taskflow/patterns')
| -rw-r--r-- | taskflow/patterns/graph_flow.py | 206 |
1 files changed, 91 insertions, 115 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 0cdcf04..40b6e68 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -16,132 +16,108 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +import collections from networkx.algorithms import dag from networkx.classes import digraph -from networkx import exception as g_exc -from taskflow import decorators from taskflow import exceptions as exc -from taskflow.patterns import linear_flow -from taskflow.utils import graph_utils -from taskflow.utils import misc +from taskflow import flow -LOG = logging.getLogger(__name__) +class Flow(flow.Flow): + """Graph flow pattern -class Flow(linear_flow.Flow): - """A extension of the linear flow which will run the associated tasks in - a linear topological ordering (and reverse using the same linear - topological order). + Nested flows will be executed according to their dependencies + that will be resolved using data tasks provide and require. + + Note: Cyclic dependencies are not allowed. """ - def __init__(self, name, parents=None, uuid=None): - super(Flow, self).__init__(name, parents, uuid) + def __init__(self, name, uuid=None): + super(Flow, self).__init__(name, uuid) self._graph = digraph.DiGraph() - @decorators.locked - def add(self, task, infer=True): - # Only insert the node to start, connect all the edges - # together later after all nodes have been added since if we try - # to infer the edges at this stage we likely will fail finding - # dependencies from nodes that don't exist. - r = misc.AOTRunner(task) - self._graph.add_node(r, uuid=r.uuid, infer=infer) - self._reset_internals() - return r.uuid - - def _find_uuid(self, uuid): - runner = None - for r in self._graph.nodes_iter(): - if r.uuid == uuid: - runner = r - break - return runner + def link(self, u, v): + if not self._graph.has_node(u): + raise ValueError('Item %s not found to link from' % (u)) + if not self._graph.has_node(v): + raise ValueError('Item %s not found to link to' % (v)) + self._graph.add_edge(u, v) + + # Ensure that there is a valid topological ordering. + if not dag.is_directed_acyclic_graph(self._graph): + self._graph.remove_edge(u, v) + raise exc.DependencyFailure("No path through the items in the" + " graph produces an ordering that" + " will allow for correct dependency" + " resolution") + + def add(self, *items): + """Adds a given task/tasks/flow/flows to this flow.""" + requirements = collections.defaultdict(list) + provided = {} + + def update_requirements(node): + for value in node.requires: + requirements[value].append(node) + + for node in self: + update_requirements(node) + for value in node.provides: + provided[value] = node - def __len__(self): - return len(self._graph) - - @decorators.locked - def add_dependency(self, provider_uuid, requirer_uuid): - """Connects provider to requirer where provider will now be required - to run before requirer does. - """ - if provider_uuid == requirer_uuid: - raise ValueError("Unable to link %s to itself" % provider_uuid) - provider = self._find_uuid(provider_uuid) - if not provider: - raise ValueError("No provider found with uuid %s" % provider_uuid) - requirer = self._find_uuid(requirer_uuid) - if not requirer: - raise ValueError("No requirer found with uuid %s" % requirer_uuid) - self._add_dependency(provider, requirer, reason='manual') - self._reset_internals() - - def _add_dependency(self, provider, requirer, reason): - self._graph.add_edge(provider, requirer, reason=reason) - - def __str__(self): - lines = ["GraphFlow: %s" % (self.name)] - lines.append("%s" % (self.uuid)) - lines.append("%s" % (self._graph.number_of_nodes())) - lines.append("%s" % (self._graph.number_of_edges())) - lines.append("%s" % (len(self.parents))) - lines.append("%s" % (self.state)) - return "; ".join(lines) - - def _reset_internals(self): - super(Flow, self)._reset_internals() - self._runners = [] - - @decorators.locked - def remove(self, uuid): - runner = self._find_uuid(uuid) - if not runner: - raise ValueError("No uuid %s found" % (uuid)) - else: - self._graph.remove_node(runner) - self._reset_internals() - - def _ordering(self): try: - return iter(self._connect()) - except g_exc.NetworkXUnfeasible: - raise exc.InvalidStateException("Unable to correctly determine " - "the path through the provided " - "flow which will satisfy the " - "tasks needed inputs and outputs.") - - def _connect(self): - """Connects the nodes & edges of the graph together by examining who - the requirements of each node and finding another node that will - create said dependency. - """ - if len(self._graph) == 0: - return [] - if self._connected: - return self._runners - - # Clear out all automatically added edges since we want to do a fresh - # connections. Leave the manually connected ones intact so that users - # still retain the dependencies they established themselves. - def discard_edge_func(u, v, e_data): - if e_data and e_data.get('reason') != 'manual': - return True - return False - - # Link providers to requirers. - graph_utils.connect(self._graph, discard_func=discard_edge_func) - - # Now figure out the order so that we can give the runners there - # optional item providers as well as figure out the topological run - # order. - run_order = dag.topological_sort(self._graph) - run_stack = [] - for r in run_order: - r.runs_before = list(reversed(run_stack)) - run_stack.append(r) - self._runners = run_order - self._connected = True - return run_order + for item in items: + self._graph.add_node(item) + update_requirements(item) + for value in item.provides: + if value in provided: + raise exc.DependencyFailure( + "%(item)s provides %(value)s but is already being" + " provided by %(flow)s and duplicate producers" + " are disallowed" + % dict(item=item.name, + flow=provided[value].name, + value=value)) + provided[value] = item + + for value in item.requires: + if value in provided: + self.link(provided[value], item) + + for value in item.provides: + if value in requirements: + for node in requirements[value]: + self.link(item, node) + + except Exception: + self._graph.remove_nodes_from(items) + raise + + return self + + def __len__(self): + return self._graph.number_of_nodes() + + def __iter__(self): + for child in self._graph.nodes_iter(): + yield child + + @property + def provides(self): + provides = set() + for subflow in self: + provides.update(subflow.provides) + return provides + + @property + def requires(self): + requires = set() + for subflow in self: + requires.update(subflow.requires) + return requires - self.provides + + @property + def graph(self): + return self._graph |
