diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-05-30 17:22:10 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-05-30 19:06:11 -0700 |
commit | a32f9245f951bc39c4cb7d746b784a9eca88392b (patch) | |
tree | 24c21870665937cfce4b5306a81db2a7cddf01b1 | |
parent | 86e651b9cec82020ad979c3ddb420787fa6df719 (diff) | |
download | taskflow-a32f9245f951bc39c4cb7d746b784a9eca88392b.tar.gz |
Move flattening to the action engine compiler
Since flattening is only one way to compile a flow and
nested flows and atoms into a compilation unit move this
functionality into the engine module where it is used.
Change-Id: Ifea6b56cf5f2a9c1d16acabfaae6f28aeb6534a0
-rw-r--r-- | doc/source/utils.rst | 5 | ||||
-rw-r--r-- | taskflow/engines/action_engine/compiler.py | 160 | ||||
-rw-r--r-- | taskflow/tests/unit/test_action_engine_compile.py (renamed from taskflow/tests/unit/test_flattening.py) | 148 | ||||
-rw-r--r-- | taskflow/utils/flow_utils.py | 180 |
4 files changed, 247 insertions, 246 deletions
diff --git a/doc/source/utils.rst b/doc/source/utils.rst index 8b5d53a..87f3727 100644 --- a/doc/source/utils.rst +++ b/doc/source/utils.rst @@ -22,8 +22,3 @@ The following classes and modules are *recommended* for external usage: .. autofunction:: taskflow.utils.persistence_utils.temporary_flow_detail .. autofunction:: taskflow.utils.persistence_utils.pformat - -Internal usage -============== - -.. automodule:: taskflow.utils.flow_utils diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 446ded9..883e7f0 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -15,9 +15,17 @@ # under the License. import collections +import logging from taskflow import exceptions as exc -from taskflow.utils import flow_utils +from taskflow import flow +from taskflow import retry +from taskflow import task +from taskflow.types import graph as gr +from taskflow.utils import misc + +LOG = logging.getLogger(__name__) + # The result of a compilers compile() is this tuple (for now it is just a # execution graph but in the future it may grow to include more attributes @@ -39,7 +47,7 @@ class PatternCompiler(object): useful to retain part of this relationship). """ def compile(self, root): - graph = flow_utils.flatten(root) + graph = _Flattener(root).flatten() if graph.number_of_nodes() == 0: # Try to get a name attribute, otherwise just use the object # string representation directly if that attribute does not exist. @@ -47,3 +55,151 @@ class PatternCompiler(object): raise exc.Empty("Root container '%s' (%s) is empty." % (name, type(root))) return Compilation(graph) + + +_RETRY_EDGE_DATA = { + 'retry': True, +} + + +class _Flattener(object): + """Flattens a root item (task/flow) into a execution graph.""" + + def __init__(self, root, freeze=True): + self._root = root + self._graph = None + self._history = set() + self._freeze = bool(freeze) + + def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs): + """Adds new edges from nodes to other nodes in the specified graph, + with the following edge attributes (defaulting to the class provided + edge_data if None), if the edge does not already exist. + """ + nodes_to = list(nodes_to) + for u in nodes_from: + for v in nodes_to: + if not graph.has_edge(u, v): + # NOTE(harlowja): give each edge its own attr copy so that + # if it's later modified that the same copy isn't modified. + graph.add_edge(u, v, attr_dict=edge_attrs.copy()) + + def _flatten(self, item): + functor = self._find_flattener(item) + if not functor: + raise TypeError("Unknown type requested to flatten: %s (%s)" + % (item, type(item))) + self._pre_item_flatten(item) + graph = functor(item) + self._post_item_flatten(item, graph) + return graph + + def _find_flattener(self, item): + """Locates the flattening function to use to flatten the given item.""" + if isinstance(item, flow.Flow): + return self._flatten_flow + elif isinstance(item, task.BaseTask): + return self._flatten_task + elif isinstance(item, retry.Retry): + raise TypeError("Retry controller %s (%s) is used not as a flow " + "parameter" % (item, type(item))) + else: + return None + + def _connect_retry(self, retry, graph): + graph.add_node(retry) + + # All graph nodes that have no predecessors should depend on its retry + nodes_to = [n for n in graph.no_predecessors_iter() if n != retry] + self._add_new_edges(graph, [retry], nodes_to, _RETRY_EDGE_DATA) + + # Add link to retry for each node of subgraph that hasn't + # a parent retry + for n in graph.nodes_iter(): + if n != retry and 'retry' not in graph.node[n]: + graph.node[n]['retry'] = retry + + def _flatten_task(self, task): + """Flattens a individual task.""" + graph = gr.DiGraph(name=task.name) + graph.add_node(task) + return graph + + def _flatten_flow(self, flow): + """Flattens a graph flow.""" + graph = gr.DiGraph(name=flow.name) + + # Flatten all nodes into a single subgraph per node. + subgraph_map = {} + for item in flow: + subgraph = self._flatten(item) + subgraph_map[item] = subgraph + graph = gr.merge_graphs([graph, subgraph]) + + # Reconnect all node edges to their corresponding subgraphs. + for (u, v, attrs) in flow.iter_links(): + u_g = subgraph_map[u] + v_g = subgraph_map[v] + if any(attrs.get(k) for k in ('invariant', 'manual', 'retry')): + # Connect nodes with no predecessors in v to nodes with + # no successors in u (thus maintaining the edge dependency). + self._add_new_edges(graph, + u_g.no_successors_iter(), + v_g.no_predecessors_iter(), + edge_attrs=attrs) + else: + # This is dependency-only edge, connect corresponding + # providers and consumers. + for provider in u_g: + for consumer in v_g: + reasons = provider.provides & consumer.requires + if reasons: + graph.add_edge(provider, consumer, reasons=reasons) + + if flow.retry is not None: + self._connect_retry(flow.retry, graph) + return graph + + def _pre_item_flatten(self, item): + """Called before a item is flattened; any pre-flattening actions.""" + if id(item) in self._history: + raise ValueError("Already flattened item: %s (%s), recursive" + " flattening not supported" % (item, id(item))) + self._history.add(id(item)) + + def _post_item_flatten(self, item, graph): + """Called before a item is flattened; any post-flattening actions.""" + + def _pre_flatten(self): + """Called before the flattening of the item starts.""" + self._history.clear() + + def _post_flatten(self, graph): + """Called after the flattening of the item finishes successfully.""" + dup_names = misc.get_duplicate_keys(graph.nodes_iter(), + key=lambda node: node.name) + if dup_names: + dup_names = ', '.join(sorted(dup_names)) + raise exc.Duplicate("Atoms with duplicate names " + "found: %s" % (dup_names)) + self._history.clear() + # NOTE(harlowja): this one can be expensive to calculate (especially + # the cycle detection), so only do it if we know debugging is enabled + # and not under all cases. + if LOG.isEnabledFor(logging.DEBUG): + LOG.debug("Translated '%s' into a graph:", self._root) + for line in graph.pformat().splitlines(): + # Indent it so that it's slightly offset from the above line. + LOG.debug(" %s", line) + + def flatten(self): + """Flattens a item (a task or flow) into a single execution graph.""" + if self._graph is not None: + return self._graph + self._pre_flatten() + graph = self._flatten(self._root) + self._post_flatten(graph) + self._graph = graph + if self._freeze: + self._graph.freeze() + return self._graph diff --git a/taskflow/tests/unit/test_flattening.py b/taskflow/tests/unit/test_action_engine_compile.py index 600a000..57c248e 100644 --- a/taskflow/tests/unit/test_flattening.py +++ b/taskflow/tests/unit/test_action_engine_compile.py @@ -24,7 +24,8 @@ from taskflow import retry from taskflow import test from taskflow.tests import utils as t_utils -from taskflow.utils import flow_utils as f_utils + +from taskflow.engines.action_engine import compiler def _make_many(amount): @@ -35,24 +36,26 @@ def _make_many(amount): return tasks -class FlattenTest(test.TestCase): - def test_flatten_task(self): +class PatternCompileTest(test.TestCase): + def test_task(self): task = t_utils.DummyTask(name='a') - g = f_utils.flatten(task) - + compilation = compiler.PatternCompiler().compile(task) + g = compilation.execution_graph self.assertEqual(list(g.nodes()), [task]) self.assertEqual(list(g.edges()), []) - def test_flatten_retry(self): + def test_retry(self): r = retry.AlwaysRevert('r1') msg_regex = "^Retry controller .* is used not as a flow parameter" - self.assertRaisesRegexp(TypeError, msg_regex, f_utils.flatten, r) + self.assertRaisesRegexp(TypeError, msg_regex, + compiler.PatternCompiler().compile, r) - def test_flatten_wrong_object(self): + def test_wrong_object(self): msg_regex = '^Unknown type requested to flatten' - self.assertRaisesRegexp(TypeError, msg_regex, f_utils.flatten, 42) + self.assertRaisesRegexp(TypeError, msg_regex, + compiler.PatternCompiler().compile, 42) - def test_linear_flatten(self): + def test_linear(self): a, b, c, d = _make_many(4) flo = lf.Flow("test") flo.add(a, b, c) @@ -60,7 +63,8 @@ class FlattenTest(test.TestCase): sflo.add(d) flo.add(sflo) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) order = g.topological_sort() @@ -71,18 +75,20 @@ class FlattenTest(test.TestCase): self.assertEqual([d], list(g.no_successors_iter())) self.assertEqual([a], list(g.no_predecessors_iter())) - def test_invalid_flatten(self): + def test_invalid(self): a, b, c = _make_many(3) flo = lf.Flow("test") flo.add(a, b, c) flo.add(flo) - self.assertRaises(ValueError, f_utils.flatten, flo) + self.assertRaises(ValueError, + compiler.PatternCompiler().compile, flo) - def test_unordered_flatten(self): + def test_unordered(self): a, b, c, d = _make_many(4) flo = uf.Flow("test") flo.add(a, b, c, d) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) self.assertEqual(0, g.number_of_edges()) self.assertEqual(set([a, b, c, d]), @@ -90,14 +96,16 @@ class FlattenTest(test.TestCase): self.assertEqual(set([a, b, c, d]), set(g.no_predecessors_iter())) - def test_linear_nested_flatten(self): + def test_linear_nested(self): a, b, c, d = _make_many(4) flo = lf.Flow("test") flo.add(a, b) flo2 = uf.Flow("test2") flo2.add(c, d) flo.add(flo2) - g = f_utils.flatten(flo) + + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) lb = g.subgraph([a, b]) @@ -112,7 +120,7 @@ class FlattenTest(test.TestCase): self.assertTrue(g.has_edge(b, c)) self.assertTrue(g.has_edge(b, d)) - def test_unordered_nested_flatten(self): + def test_unordered_nested(self): a, b, c, d = _make_many(4) flo = uf.Flow("test") flo.add(a, b) @@ -120,7 +128,8 @@ class FlattenTest(test.TestCase): flo2.add(c, d) flo.add(flo2) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) for n in [a, b]: self.assertFalse(g.has_edge(n, c)) @@ -134,14 +143,15 @@ class FlattenTest(test.TestCase): lb = g.subgraph([c, d]) self.assertEqual(1, lb.number_of_edges()) - def test_unordered_nested_in_linear_flatten(self): + def test_unordered_nested_in_linear(self): a, b, c, d = _make_many(4) flo = lf.Flow('lt').add( a, uf.Flow('ut').add(b, c), d) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) self.assertItemsEqual(g.edges(), [ (a, b), @@ -150,16 +160,17 @@ class FlattenTest(test.TestCase): (c, d) ]) - def test_graph_flatten(self): + def test_graph(self): a, b, c, d = _make_many(4) flo = gf.Flow("test") flo.add(a, b, c, d) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) self.assertEqual(0, g.number_of_edges()) - def test_graph_flatten_nested(self): + def test_graph_nested(self): a, b, c, d, e, f, g = _make_many(7) flo = gf.Flow("test") flo.add(a, b, c, d) @@ -168,14 +179,15 @@ class FlattenTest(test.TestCase): flo2.add(e, f, g) flo.add(flo2) - graph = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + graph = compilation.execution_graph self.assertEqual(7, len(graph)) self.assertItemsEqual(graph.edges(data=True), [ (e, f, {'invariant': True}), (f, g, {'invariant': True}) ]) - def test_graph_flatten_nested_graph(self): + def test_graph_nested_graph(self): a, b, c, d, e, f, g = _make_many(7) flo = gf.Flow("test") flo.add(a, b, c, d) @@ -184,11 +196,12 @@ class FlattenTest(test.TestCase): flo2.add(e, f, g) flo.add(flo2) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(7, len(g)) self.assertEqual(0, g.number_of_edges()) - def test_graph_flatten_links(self): + def test_graph_links(self): a, b, c, d = _make_many(4) flo = gf.Flow("test") flo.add(a, b, c, d) @@ -196,7 +209,8 @@ class FlattenTest(test.TestCase): flo.link(b, c) flo.link(c, d) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) self.assertItemsEqual(g.edges(data=True), [ (a, b, {'manual': True}), @@ -206,12 +220,13 @@ class FlattenTest(test.TestCase): self.assertItemsEqual([a], g.no_predecessors_iter()) self.assertItemsEqual([d], g.no_successors_iter()) - def test_graph_flatten_dependencies(self): + def test_graph_dependencies(self): a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) b = t_utils.ProvidesRequiresTask('b', provides=[], requires=['x']) flo = gf.Flow("test").add(a, b) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(2, len(g)) self.assertItemsEqual(g.edges(data=True), [ (a, b, {'reasons': set(['x'])}) @@ -219,7 +234,7 @@ class FlattenTest(test.TestCase): self.assertItemsEqual([a], g.no_predecessors_iter()) self.assertItemsEqual([b], g.no_successors_iter()) - def test_graph_flatten_nested_requires(self): + def test_graph_nested_requires(self): a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[]) b = t_utils.ProvidesRequiresTask('b', provides=[], requires=[]) c = t_utils.ProvidesRequiresTask('c', provides=[], requires=['x']) @@ -228,7 +243,8 @@ class FlattenTest(test.TestCase): lf.Flow("test2").add(b, c) ) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(3, len(g)) self.assertItemsEqual(g.edges(data=True), [ (a, c, {'reasons': set(['x'])}), @@ -237,7 +253,7 @@ class FlattenTest(test.TestCase): self.assertItemsEqual([a, b], g.no_predecessors_iter()) self.assertItemsEqual([c], g.no_successors_iter()) - def test_graph_flatten_nested_provides(self): + def test_graph_nested_provides(self): a = t_utils.ProvidesRequiresTask('a', provides=[], requires=['x']) b = t_utils.ProvidesRequiresTask('b', provides=['x'], requires=[]) c = t_utils.ProvidesRequiresTask('c', provides=[], requires=[]) @@ -246,7 +262,8 @@ class FlattenTest(test.TestCase): lf.Flow("test2").add(b, c) ) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(3, len(g)) self.assertItemsEqual(g.edges(data=True), [ (b, c, {'invariant': True}), @@ -255,46 +272,50 @@ class FlattenTest(test.TestCase): self.assertItemsEqual([b], g.no_predecessors_iter()) self.assertItemsEqual([a, c], g.no_successors_iter()) - def test_flatten_checks_for_dups(self): + def test_checks_for_dups(self): flo = gf.Flow("test").add( t_utils.DummyTask(name="a"), t_utils.DummyTask(name="a") ) self.assertRaisesRegexp(exc.Duplicate, - '^Tasks with duplicate names', - f_utils.flatten, flo) + '^Atoms with duplicate names', + compiler.PatternCompiler().compile, flo) - def test_flatten_checks_for_dups_globally(self): + def test_checks_for_dups_globally(self): flo = gf.Flow("test").add( gf.Flow("int1").add(t_utils.DummyTask(name="a")), gf.Flow("int2").add(t_utils.DummyTask(name="a"))) self.assertRaisesRegexp(exc.Duplicate, - '^Tasks with duplicate names', - f_utils.flatten, flo) + '^Atoms with duplicate names', + compiler.PatternCompiler().compile, flo) - def test_flatten_retry_in_linear_flow(self): + def test_retry_in_linear_flow(self): flo = lf.Flow("test", retry.AlwaysRevert("c")) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(1, len(g)) self.assertEqual(0, g.number_of_edges()) - def test_flatten_retry_in_unordered_flow(self): + def test_retry_in_unordered_flow(self): flo = uf.Flow("test", retry.AlwaysRevert("c")) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(1, len(g)) self.assertEqual(0, g.number_of_edges()) - def test_flatten_retry_in_graph_flow(self): + def test_retry_in_graph_flow(self): flo = gf.Flow("test", retry.AlwaysRevert("c")) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(1, len(g)) self.assertEqual(0, g.number_of_edges()) - def test_flatten_retry_in_nested_flows(self): + def test_retry_in_nested_flows(self): c1 = retry.AlwaysRevert("c1") c2 = retry.AlwaysRevert("c2") flo = lf.Flow("test", c1).add(lf.Flow("test2", c2)) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(2, len(g)) self.assertItemsEqual(g.edges(data=True), [ @@ -304,11 +325,13 @@ class FlattenTest(test.TestCase): self.assertItemsEqual([c1], g.no_predecessors_iter()) self.assertItemsEqual([c2], g.no_successors_iter()) - def test_flatten_retry_in_linear_flow_with_tasks(self): + def test_retry_in_linear_flow_with_tasks(self): c = retry.AlwaysRevert("c") a, b = _make_many(2) flo = lf.Flow("test", c).add(a, b) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph + self.assertEqual(3, len(g)) self.assertItemsEqual(g.edges(data=True), [ (a, b, {'invariant': True}), @@ -320,11 +343,13 @@ class FlattenTest(test.TestCase): self.assertIs(c, g.node[a]['retry']) self.assertIs(c, g.node[b]['retry']) - def test_flatten_retry_in_unordered_flow_with_tasks(self): + def test_retry_in_unordered_flow_with_tasks(self): c = retry.AlwaysRevert("c") a, b = _make_many(2) flo = uf.Flow("test", c).add(a, b) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph + self.assertEqual(3, len(g)) self.assertItemsEqual(g.edges(data=True), [ (c, a, {'retry': True}), @@ -336,11 +361,12 @@ class FlattenTest(test.TestCase): self.assertIs(c, g.node[a]['retry']) self.assertIs(c, g.node[b]['retry']) - def test_flatten_retry_in_graph_flow_with_tasks(self): + def test_retry_in_graph_flow_with_tasks(self): r = retry.AlwaysRevert("cp") a, b, c = _make_many(3) flo = gf.Flow("test", r).add(a, b, c).link(b, c) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph self.assertEqual(4, len(g)) self.assertItemsEqual(g.edges(data=True), [ @@ -355,7 +381,7 @@ class FlattenTest(test.TestCase): self.assertIs(r, g.node[b]['retry']) self.assertIs(r, g.node[c]['retry']) - def test_flatten_retries_hierarchy(self): + def test_retries_hierarchy(self): c1 = retry.AlwaysRevert("cp1") c2 = retry.AlwaysRevert("cp2") a, b, c, d = _make_many(4) @@ -363,7 +389,9 @@ class FlattenTest(test.TestCase): a, lf.Flow("test", c2).add(b, c), d) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph + self.assertEqual(6, len(g)) self.assertItemsEqual(g.edges(data=True), [ (c1, a, {'retry': True}), @@ -379,14 +407,16 @@ class FlattenTest(test.TestCase): self.assertIs(c1, g.node[c2]['retry']) self.assertIs(None, g.node[c1].get('retry')) - def test_flatten_retry_subflows_hierarchy(self): + def test_retry_subflows_hierarchy(self): c1 = retry.AlwaysRevert("cp1") a, b, c, d = _make_many(4) flo = lf.Flow("test", c1).add( a, lf.Flow("test").add(b, c), d) - g = f_utils.flatten(flo) + compilation = compiler.PatternCompiler().compile(flo) + g = compilation.execution_graph + self.assertEqual(5, len(g)) self.assertItemsEqual(g.edges(data=True), [ (c1, a, {'retry': True}), diff --git a/taskflow/utils/flow_utils.py b/taskflow/utils/flow_utils.py deleted file mode 100644 index 6b54d56..0000000 --- a/taskflow/utils/flow_utils.py +++ /dev/null @@ -1,180 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from taskflow import exceptions -from taskflow import flow -from taskflow import retry -from taskflow import task -from taskflow.types import graph as gr -from taskflow.utils import misc - - -LOG = logging.getLogger(__name__) - - -RETRY_EDGE_DATA = { - 'retry': True, -} - - -class Flattener(object): - def __init__(self, root, freeze=True): - self._root = root - self._graph = None - self._history = set() - self._freeze = bool(freeze) - - def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs): - """Adds new edges from nodes to other nodes in the specified graph, - with the following edge attributes (defaulting to the class provided - edge_data if None), if the edge does not already exist. - """ - nodes_to = list(nodes_to) - for u in nodes_from: - for v in nodes_to: - if not graph.has_edge(u, v): - # NOTE(harlowja): give each edge its own attr copy so that - # if it's later modified that the same copy isn't modified. - graph.add_edge(u, v, attr_dict=edge_attrs.copy()) - - def _flatten(self, item): - functor = self._find_flattener(item) - if not functor: - raise TypeError("Unknown type requested to flatten: %s (%s)" - % (item, type(item))) - self._pre_item_flatten(item) - graph = functor(item) - self._post_item_flatten(item, graph) - return graph - - def _find_flattener(self, item): - """Locates the flattening function to use to flatten the given item.""" - if isinstance(item, flow.Flow): - return self._flatten_flow - elif isinstance(item, task.BaseTask): - return self._flatten_task - elif isinstance(item, retry.Retry): - raise TypeError("Retry controller %s (%s) is used not as a flow " - "parameter" % (item, type(item))) - else: - return None - - def _connect_retry(self, retry, graph): - graph.add_node(retry) - - # All graph nodes that have no predecessors should depend on its retry - nodes_to = [n for n in graph.no_predecessors_iter() if n != retry] - self._add_new_edges(graph, [retry], nodes_to, RETRY_EDGE_DATA) - - # Add link to retry for each node of subgraph that hasn't - # a parent retry - for n in graph.nodes_iter(): - if n != retry and 'retry' not in graph.node[n]: - graph.node[n]['retry'] = retry - - def _flatten_task(self, task): - """Flattens a individual task.""" - graph = gr.DiGraph(name=task.name) - graph.add_node(task) - return graph - - def _flatten_flow(self, flow): - """Flattens a graph flow.""" - graph = gr.DiGraph(name=flow.name) - - # Flatten all nodes into a single subgraph per node. - subgraph_map = {} - for item in flow: - subgraph = self._flatten(item) - subgraph_map[item] = subgraph - graph = gr.merge_graphs([graph, subgraph]) - - # Reconnect all node edges to their corresponding subgraphs. - for (u, v, attrs) in flow.iter_links(): - u_g = subgraph_map[u] - v_g = subgraph_map[v] - if any(attrs.get(k) for k in ('invariant', 'manual', 'retry')): - # Connect nodes with no predecessors in v to nodes with - # no successors in u (thus maintaining the edge dependency). - self._add_new_edges(graph, - u_g.no_successors_iter(), - v_g.no_predecessors_iter(), - edge_attrs=attrs) - else: - # This is dependency-only edge, connect corresponding - # providers and consumers. - for provider in u_g: - for consumer in v_g: - reasons = provider.provides & consumer.requires - if reasons: - graph.add_edge(provider, consumer, reasons=reasons) - - if flow.retry is not None: - self._connect_retry(flow.retry, graph) - return graph - - def _pre_item_flatten(self, item): - """Called before a item is flattened; any pre-flattening actions.""" - if id(item) in self._history: - raise ValueError("Already flattened item: %s (%s), recursive" - " flattening not supported" % (item, id(item))) - LOG.debug("Starting to flatten '%s'", item) - self._history.add(id(item)) - - def _post_item_flatten(self, item, graph): - """Called before a item is flattened; any post-flattening actions.""" - LOG.debug("Finished flattening '%s'", item) - # NOTE(harlowja): this one can be expensive to calculate (especially - # the cycle detection), so only do it if we know debugging is enabled - # and not under all cases. - if LOG.isEnabledFor(logging.DEBUG): - LOG.debug("Translated '%s' into a graph:", item) - for line in graph.pformat().splitlines(): - # Indent it so that it's slightly offset from the above line. - LOG.debug(" %s", line) - - def _pre_flatten(self): - """Called before the flattening of the item starts.""" - self._history.clear() - - def _post_flatten(self, graph): - """Called after the flattening of the item finishes successfully.""" - dup_names = misc.get_duplicate_keys(graph.nodes_iter(), - key=lambda node: node.name) - if dup_names: - dup_names = ', '.join(sorted(dup_names)) - raise exceptions.Duplicate("Tasks with duplicate names " - "found: %s" % (dup_names)) - self._history.clear() - - def flatten(self): - """Flattens a item (a task or flow) into a single execution graph.""" - if self._graph is not None: - return self._graph - self._pre_flatten() - graph = self._flatten(self._root) - self._post_flatten(graph) - self._graph = graph - if self._freeze: - self._graph.freeze() - return self._graph - - -def flatten(item, freeze=True): - """Flattens a item (a task or flow) into a single execution graph.""" - return Flattener(item, freeze=freeze).flatten() |