summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit/action_engine/test_compile.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/tests/unit/action_engine/test_compile.py')
-rw-r--r--taskflow/tests/unit/action_engine/test_compile.py344
1 files changed, 190 insertions, 154 deletions
diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py
index 884cd8d..b676e0e 100644
--- a/taskflow/tests/unit/action_engine/test_compile.py
+++ b/taskflow/tests/unit/action_engine/test_compile.py
@@ -49,21 +49,22 @@ class PatternCompileTest(test.TestCase):
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b, c)
- sflo = lf.Flow("sub-test")
- sflo.add(d)
- flo.add(sflo)
+ inner_flo = lf.Flow("sub-test")
+ inner_flo.add(d)
+ flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(6, len(g))
order = g.topological_sort()
- self.assertEqual([a, b, c, d], order)
- self.assertTrue(g.has_edge(c, d))
- self.assertEqual(g.get_edge_data(c, d), {'invariant': True})
+ self.assertEqual([flo, a, b, c, inner_flo, d], order)
+ self.assertTrue(g.has_edge(c, inner_flo))
+ self.assertTrue(g.has_edge(inner_flo, d))
+ self.assertEqual(g.get_edge_data(inner_flo, d), {'invariant': True})
self.assertEqual([d], list(g.no_successors_iter()))
- self.assertEqual([a], list(g.no_predecessors_iter()))
+ self.assertEqual([flo], list(g.no_predecessors_iter()))
def test_invalid(self):
a, b, c = test_utils.make_many(3)
@@ -79,36 +80,42 @@ class PatternCompileTest(test.TestCase):
flo.add(a, b, c, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(5, len(g))
+ self.assertItemsEqual(g.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, c),
+ (flo, d),
+ ])
self.assertEqual(set([a, b, c, d]),
set(g.no_successors_iter()))
- self.assertEqual(set([a, b, c, d]),
+ self.assertEqual(set([flo]),
set(g.no_predecessors_iter()))
def test_linear_nested(self):
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b)
- flo2 = uf.Flow("test2")
- flo2.add(c, d)
- flo.add(flo2)
+ inner_flo = uf.Flow("test2")
+ inner_flo.add(c, d)
+ flo.add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ graph = compilation.execution_graph
+ self.assertEqual(6, len(graph))
- lb = g.subgraph([a, b])
+ lb = graph.subgraph([a, b])
self.assertFalse(lb.has_edge(b, a))
self.assertTrue(lb.has_edge(a, b))
- self.assertEqual(g.get_edge_data(a, b), {'invariant': True})
+ self.assertEqual(graph.get_edge_data(a, b), {'invariant': True})
- ub = g.subgraph([c, d])
+ ub = graph.subgraph([c, d])
self.assertEqual(0, ub.number_of_edges())
# This ensures that c and d do not start executing until after b.
- self.assertTrue(g.has_edge(b, c))
- self.assertTrue(g.has_edge(b, d))
+ self.assertTrue(graph.has_edge(b, inner_flo))
+ self.assertTrue(graph.has_edge(inner_flo, c))
+ self.assertTrue(graph.has_edge(inner_flo, d))
def test_unordered_nested(self):
a, b, c, d = test_utils.make_many(4)
@@ -120,34 +127,30 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
- for n in [a, b]:
- self.assertFalse(g.has_edge(n, c))
- self.assertFalse(g.has_edge(n, d))
- self.assertFalse(g.has_edge(d, c))
- self.assertTrue(g.has_edge(c, d))
- self.assertEqual(g.get_edge_data(c, d), {'invariant': True})
-
- ub = g.subgraph([a, b])
- self.assertEqual(0, ub.number_of_edges())
- lb = g.subgraph([c, d])
- self.assertEqual(1, lb.number_of_edges())
+ self.assertEqual(6, len(g))
+ self.assertItemsEqual(g.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, flo2),
+ (flo2, c),
+ (c, d)
+ ])
def test_unordered_nested_in_linear(self):
a, b, c, d = test_utils.make_many(4)
- flo = lf.Flow('lt').add(
- a,
- uf.Flow('ut').add(b, c),
- d)
+ inner_flo = uf.Flow('ut').add(b, c)
+ flo = lf.Flow('lt').add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(), [
- (a, b),
- (a, c),
+ (flo, a),
+ (a, inner_flo),
+ (inner_flo, b),
+ (inner_flo, c),
(b, d),
- (c, d)
+ (c, d),
])
def test_graph(self):
@@ -157,8 +160,8 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(5, len(g))
+ self.assertEqual(4, g.number_of_edges())
def test_graph_nested(self):
a, b, c, d, e, f, g = test_utils.make_many(7)
@@ -171,10 +174,17 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
graph = compilation.execution_graph
- self.assertEqual(7, len(graph))
- self.assertItemsEqual(graph.edges(data=True), [
- (e, f, {'invariant': True}),
- (f, g, {'invariant': True})
+ self.assertEqual(9, len(graph))
+ self.assertItemsEqual(graph.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, c),
+ (flo, d),
+ (flo, flo2),
+
+ (flo2, e),
+ (e, f),
+ (f, g),
])
def test_graph_nested_graph(self):
@@ -187,9 +197,19 @@ class PatternCompileTest(test.TestCase):
flo.add(flo2)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(7, len(g))
- self.assertEqual(0, g.number_of_edges())
+ graph = compilation.execution_graph
+ self.assertEqual(9, len(graph))
+ self.assertItemsEqual(graph.edges(), [
+ (flo, a),
+ (flo, b),
+ (flo, c),
+ (flo, d),
+ (flo, flo2),
+
+ (flo2, e),
+ (flo2, f),
+ (flo2, g),
+ ])
def test_graph_links(self):
a, b, c, d = test_utils.make_many(4)
@@ -201,13 +221,15 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
+ (flo, a, {'invariant': True}),
+
(a, b, {'manual': True}),
(b, c, {'manual': True}),
(c, d, {'manual': True}),
])
- self.assertItemsEqual([a], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([d], g.no_successors_iter())
def test_graph_dependencies(self):
@@ -217,96 +239,112 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(2, len(g))
+ self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
+ (flo, a, {'invariant': True}),
(a, b, {'reasons': set(['x'])})
])
- self.assertItemsEqual([a], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
def test_graph_nested_requires(self):
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
- flo = gf.Flow("test").add(
- a,
- lf.Flow("test2").add(b, c)
- )
+ inner_flo = lf.Flow("test2").add(b, c)
+ flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(3, len(g))
- self.assertItemsEqual(g.edges(data=True), [
- (a, c, {'reasons': set(['x'])}),
- (b, c, {'invariant': True})
+ graph = compilation.execution_graph
+ self.assertEqual(5, len(graph))
+ self.assertItemsEqual(graph.edges(data=True), [
+ (flo, a, {'invariant': True}),
+ (inner_flo, b, {'invariant': True}),
+ (a, inner_flo, {'reasons': set(['x'])}),
+ (b, c, {'invariant': True}),
])
- self.assertItemsEqual([a, b], g.no_predecessors_iter())
- self.assertItemsEqual([c], g.no_successors_iter())
+ self.assertItemsEqual([flo], graph.no_predecessors_iter())
+ self.assertItemsEqual([c], graph.no_successors_iter())
def test_graph_nested_provides(self):
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
b = test_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[])
- flo = gf.Flow("test").add(
- a,
- lf.Flow("test2").add(b, c)
- )
+ inner_flo = lf.Flow("test2").add(b, c)
+ flo = gf.Flow("test").add(a, inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
- g = compilation.execution_graph
- self.assertEqual(3, len(g))
- self.assertItemsEqual(g.edges(data=True), [
+ graph = compilation.execution_graph
+ self.assertEqual(5, len(graph))
+ self.assertItemsEqual(graph.edges(data=True), [
+ (flo, inner_flo, {'invariant': True}),
+
+ (inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
- (b, a, {'reasons': set(['x'])})
+ (c, a, {'reasons': set(['x'])}),
])
- self.assertItemsEqual([b], g.no_predecessors_iter())
- self.assertItemsEqual([a, c], g.no_successors_iter())
+ self.assertItemsEqual([flo], graph.no_predecessors_iter())
+ self.assertItemsEqual([a], graph.no_successors_iter())
def test_empty_flow_in_linear_flow(self):
- flow = lf.Flow('lf')
+ flo = lf.Flow('lf')
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
- empty_flow = gf.Flow("empty")
- flow.add(a, empty_flow, b)
+ empty_flo = gf.Flow("empty")
+ flo.add(a, empty_flo, b)
- compilation = compiler.PatternCompiler(flow).compile()
- g = compilation.execution_graph
- self.assertItemsEqual(g.edges(data=True), [
- (a, b, {'invariant': True}),
+ compilation = compiler.PatternCompiler(flo).compile()
+ graph = compilation.execution_graph
+ self.assertItemsEqual(graph.edges(), [
+ (flo, a),
+ (a, empty_flo),
+ (empty_flo, b),
])
def test_many_empty_in_graph_flow(self):
- flow = gf.Flow('root')
+ flo = gf.Flow('root')
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
- flow.add(a)
+ flo.add(a)
b = lf.Flow('b')
b_0 = test_utils.ProvidesRequiresTask('b.0', provides=[], requires=[])
+ b_1 = lf.Flow('b.1')
+ b_2 = lf.Flow('b.2')
b_3 = test_utils.ProvidesRequiresTask('b.3', provides=[], requires=[])
- b.add(
- b_0,
- lf.Flow('b.1'), lf.Flow('b.2'),
- b_3,
- )
- flow.add(b)
+ b.add(b_0, b_1, b_2, b_3)
+ flo.add(b)
c = lf.Flow('c')
- c.add(lf.Flow('c.0'), lf.Flow('c.1'), lf.Flow('c.2'))
- flow.add(c)
+ c_0 = lf.Flow('c.0')
+ c_1 = lf.Flow('c.1')
+ c_2 = lf.Flow('c.2')
+ c.add(c_0, c_1, c_2)
+ flo.add(c)
d = test_utils.ProvidesRequiresTask('d', provides=[], requires=[])
- flow.add(d)
+ flo.add(d)
- flow.link(b, d)
- flow.link(a, d)
- flow.link(c, d)
+ flo.link(b, d)
+ flo.link(a, d)
+ flo.link(c, d)
- compilation = compiler.PatternCompiler(flow).compile()
- g = compilation.execution_graph
- self.assertTrue(g.has_edge(b_0, b_3))
- self.assertTrue(g.has_edge(b_3, d))
- self.assertEqual(4, len(g))
+ compilation = compiler.PatternCompiler(flo).compile()
+ graph = compilation.execution_graph
+
+ self.assertTrue(graph.has_edge(flo, a))
+
+ self.assertTrue(graph.has_edge(flo, b))
+ self.assertTrue(graph.has_edge(b_0, b_1))
+ self.assertTrue(graph.has_edge(b_1, b_2))
+ self.assertTrue(graph.has_edge(b_2, b_3))
+
+ self.assertTrue(graph.has_edge(flo, c))
+ self.assertTrue(graph.has_edge(c_0, c_1))
+ self.assertTrue(graph.has_edge(c_1, c_2))
+
+ self.assertTrue(graph.has_edge(b_3, d))
+ self.assertEqual(12, len(graph))
def test_empty_flow_in_nested_flow(self):
flow = lf.Flow('lf')
@@ -323,9 +361,10 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
- self.assertTrue(g.has_edge(a, c))
- self.assertTrue(g.has_edge(c, d))
- self.assertTrue(g.has_edge(d, b))
+ for source, target in [(flow, a), (a, flow2),
+ (flow2, c), (c, empty_flow),
+ (empty_flow, d), (d, b)]:
+ self.assertTrue(g.has_edge(source, target))
def test_empty_flow_in_graph_flow(self):
flow = lf.Flow('lf')
@@ -336,19 +375,9 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
- self.assertTrue(g.has_edge(a, b))
-
- def test_empty_flow_in_graph_flow_empty_linkage(self):
- flow = gf.Flow('lf')
- a = test_utils.ProvidesRequiresTask('a', provides=[], requires=[])
- b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
- empty_flow = lf.Flow("empty")
- flow.add(a, empty_flow, b)
- flow.link(empty_flow, b)
-
- compilation = compiler.PatternCompiler(flow).compile()
- g = compilation.execution_graph
- self.assertEqual(0, len(g.edges()))
+ self.assertTrue(g.has_edge(flow, a))
+ self.assertTrue(g.has_edge(a, empty_flow))
+ self.assertTrue(g.has_edge(empty_flow, b))
def test_empty_flow_in_graph_flow_linkage(self):
flow = gf.Flow('lf')
@@ -360,8 +389,9 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flow).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g.edges()))
self.assertTrue(g.has_edge(a, b))
+ self.assertTrue(g.has_edge(flow, a))
+ self.assertTrue(g.has_edge(flow, empty_flow))
def test_checks_for_dups(self):
flo = gf.Flow("test").add(
@@ -384,36 +414,39 @@ class PatternCompileTest(test.TestCase):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(2, len(g))
+ self.assertEqual(1, g.number_of_edges())
def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(2, len(g))
+ self.assertEqual(1, g.number_of_edges())
def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(1, len(g))
- self.assertEqual(0, g.number_of_edges())
+ self.assertEqual(2, len(g))
+ self.assertEqual(1, g.number_of_edges())
def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
- flo = lf.Flow("test", c1).add(lf.Flow("test2", c2))
+ inner_flo = lf.Flow("test2", c2)
+ flo = lf.Flow("test", c1).add(inner_flo)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(2, len(g))
+ self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c1, c2, {'retry': True})
+ (flo, c1, {'invariant': True}),
+ (c1, inner_flo, {'invariant': True, 'retry': True}),
+ (inner_flo, c2, {'invariant': True}),
])
self.assertIs(c1, g.node[c2]['retry'])
- self.assertItemsEqual([c1], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([c2], g.no_successors_iter())
def test_retry_in_linear_flow_with_tasks(self):
@@ -423,13 +456,14 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(3, len(g))
+ self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
+ (flo, c, {'invariant': True}),
(a, b, {'invariant': True}),
- (c, a, {'retry': True})
+ (c, a, {'invariant': True, 'retry': True})
])
- self.assertItemsEqual([c], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
@@ -441,13 +475,14 @@ class PatternCompileTest(test.TestCase):
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(3, len(g))
+ self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c, a, {'retry': True}),
- (c, b, {'retry': True})
+ (flo, c, {'invariant': True}),
+ (c, a, {'invariant': True, 'retry': True}),
+ (c, b, {'invariant': True, 'retry': True}),
])
- self.assertItemsEqual([c], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, b], g.no_successors_iter())
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
@@ -458,15 +493,16 @@ class PatternCompileTest(test.TestCase):
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(4, len(g))
+ self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (r, a, {'retry': True}),
- (r, b, {'retry': True}),
+ (flo, r, {'invariant': True}),
+ (r, a, {'invariant': True, 'retry': True}),
+ (r, b, {'invariant': True, 'retry': True}),
(b, c, {'manual': True})
])
- self.assertItemsEqual([r], g.no_predecessors_iter())
+ self.assertItemsEqual([flo], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter())
self.assertIs(r, g.node[a]['retry'])
self.assertIs(r, g.node[b]['retry'])
@@ -476,18 +512,18 @@ class PatternCompileTest(test.TestCase):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
a, b, c, d = test_utils.make_many(4)
- flo = lf.Flow("test", c1).add(
- a,
- lf.Flow("test", c2).add(b, c),
- d)
+ inner_flo = lf.Flow("test", c2).add(b, c)
+ flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(6, len(g))
+ self.assertEqual(8, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c1, a, {'retry': True}),
- (a, c2, {'invariant': True}),
- (c2, b, {'retry': True}),
+ (flo, c1, {'invariant': True}),
+ (c1, a, {'invariant': True, 'retry': True}),
+ (a, inner_flo, {'invariant': True}),
+ (inner_flo, c2, {'invariant': True}),
+ (c2, b, {'invariant': True, 'retry': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
])
@@ -501,17 +537,17 @@ class PatternCompileTest(test.TestCase):
def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
a, b, c, d = test_utils.make_many(4)
- flo = lf.Flow("test", c1).add(
- a,
- lf.Flow("test").add(b, c),
- d)
+ inner_flo = lf.Flow("test").add(b, c)
+ flo = lf.Flow("test", c1).add(a, inner_flo, d)
compilation = compiler.PatternCompiler(flo).compile()
g = compilation.execution_graph
- self.assertEqual(5, len(g))
+ self.assertEqual(7, len(g))
self.assertItemsEqual(g.edges(data=True), [
- (c1, a, {'retry': True}),
- (a, b, {'invariant': True}),
+ (flo, c1, {'invariant': True}),
+ (c1, a, {'invariant': True, 'retry': True}),
+ (a, inner_flo, {'invariant': True}),
+ (inner_flo, b, {'invariant': True}),
(b, c, {'invariant': True}),
(c, d, {'invariant': True}),
])