summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-05-30 17:22:10 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-05-30 19:06:11 -0700
commita32f9245f951bc39c4cb7d746b784a9eca88392b (patch)
tree24c21870665937cfce4b5306a81db2a7cddf01b1
parent86e651b9cec82020ad979c3ddb420787fa6df719 (diff)
downloadtaskflow-a32f9245f951bc39c4cb7d746b784a9eca88392b.tar.gz
Move flattening to the action engine compiler
Since flattening is only one way to compile a flow and nested flows and atoms into a compilation unit move this functionality into the engine module where it is used. Change-Id: Ifea6b56cf5f2a9c1d16acabfaae6f28aeb6534a0
-rw-r--r--doc/source/utils.rst5
-rw-r--r--taskflow/engines/action_engine/compiler.py160
-rw-r--r--taskflow/tests/unit/test_action_engine_compile.py (renamed from taskflow/tests/unit/test_flattening.py)148
-rw-r--r--taskflow/utils/flow_utils.py180
4 files changed, 247 insertions, 246 deletions
diff --git a/doc/source/utils.rst b/doc/source/utils.rst
index 8b5d53a..87f3727 100644
--- a/doc/source/utils.rst
+++ b/doc/source/utils.rst
@@ -22,8 +22,3 @@ The following classes and modules are *recommended* for external usage:
.. autofunction:: taskflow.utils.persistence_utils.temporary_flow_detail
.. autofunction:: taskflow.utils.persistence_utils.pformat
-
-Internal usage
-==============
-
-.. automodule:: taskflow.utils.flow_utils
diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py
index 446ded9..883e7f0 100644
--- a/taskflow/engines/action_engine/compiler.py
+++ b/taskflow/engines/action_engine/compiler.py
@@ -15,9 +15,17 @@
# under the License.
import collections
+import logging
from taskflow import exceptions as exc
-from taskflow.utils import flow_utils
+from taskflow import flow
+from taskflow import retry
+from taskflow import task
+from taskflow.types import graph as gr
+from taskflow.utils import misc
+
+LOG = logging.getLogger(__name__)
+
# The result of a compilers compile() is this tuple (for now it is just a
# execution graph but in the future it may grow to include more attributes
@@ -39,7 +47,7 @@ class PatternCompiler(object):
useful to retain part of this relationship).
"""
def compile(self, root):
- graph = flow_utils.flatten(root)
+ graph = _Flattener(root).flatten()
if graph.number_of_nodes() == 0:
# Try to get a name attribute, otherwise just use the object
# string representation directly if that attribute does not exist.
@@ -47,3 +55,151 @@ class PatternCompiler(object):
raise exc.Empty("Root container '%s' (%s) is empty."
% (name, type(root)))
return Compilation(graph)
+
+
+_RETRY_EDGE_DATA = {
+ 'retry': True,
+}
+
+
+class _Flattener(object):
+ """Flattens a root item (task/flow) into a execution graph."""
+
+ def __init__(self, root, freeze=True):
+ self._root = root
+ self._graph = None
+ self._history = set()
+ self._freeze = bool(freeze)
+
+ def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs):
+ """Adds new edges from nodes to other nodes in the specified graph,
+ with the following edge attributes (defaulting to the class provided
+ edge_data if None), if the edge does not already exist.
+ """
+ nodes_to = list(nodes_to)
+ for u in nodes_from:
+ for v in nodes_to:
+ if not graph.has_edge(u, v):
+ # NOTE(harlowja): give each edge its own attr copy so that
+ # if it's later modified that the same copy isn't modified.
+ graph.add_edge(u, v, attr_dict=edge_attrs.copy())
+
+ def _flatten(self, item):
+ functor = self._find_flattener(item)
+ if not functor:
+ raise TypeError("Unknown type requested to flatten: %s (%s)"
+ % (item, type(item)))
+ self._pre_item_flatten(item)
+ graph = functor(item)
+ self._post_item_flatten(item, graph)
+ return graph
+
+ def _find_flattener(self, item):
+ """Locates the flattening function to use to flatten the given item."""
+ if isinstance(item, flow.Flow):
+ return self._flatten_flow
+ elif isinstance(item, task.BaseTask):
+ return self._flatten_task
+ elif isinstance(item, retry.Retry):
+ raise TypeError("Retry controller %s (%s) is used not as a flow "
+ "parameter" % (item, type(item)))
+ else:
+ return None
+
+ def _connect_retry(self, retry, graph):
+ graph.add_node(retry)
+
+ # All graph nodes that have no predecessors should depend on its retry
+ nodes_to = [n for n in graph.no_predecessors_iter() if n != retry]
+ self._add_new_edges(graph, [retry], nodes_to, _RETRY_EDGE_DATA)
+
+ # Add link to retry for each node of subgraph that hasn't
+ # a parent retry
+ for n in graph.nodes_iter():
+ if n != retry and 'retry' not in graph.node[n]:
+ graph.node[n]['retry'] = retry
+
+ def _flatten_task(self, task):
+ """Flattens a individual task."""
+ graph = gr.DiGraph(name=task.name)
+ graph.add_node(task)
+ return graph
+
+ def _flatten_flow(self, flow):
+ """Flattens a graph flow."""
+ graph = gr.DiGraph(name=flow.name)
+
+ # Flatten all nodes into a single subgraph per node.
+ subgraph_map = {}
+ for item in flow:
+ subgraph = self._flatten(item)
+ subgraph_map[item] = subgraph
+ graph = gr.merge_graphs([graph, subgraph])
+
+ # Reconnect all node edges to their corresponding subgraphs.
+ for (u, v, attrs) in flow.iter_links():
+ u_g = subgraph_map[u]
+ v_g = subgraph_map[v]
+ if any(attrs.get(k) for k in ('invariant', 'manual', 'retry')):
+ # Connect nodes with no predecessors in v to nodes with
+ # no successors in u (thus maintaining the edge dependency).
+ self._add_new_edges(graph,
+ u_g.no_successors_iter(),
+ v_g.no_predecessors_iter(),
+ edge_attrs=attrs)
+ else:
+ # This is dependency-only edge, connect corresponding
+ # providers and consumers.
+ for provider in u_g:
+ for consumer in v_g:
+ reasons = provider.provides & consumer.requires
+ if reasons:
+ graph.add_edge(provider, consumer, reasons=reasons)
+
+ if flow.retry is not None:
+ self._connect_retry(flow.retry, graph)
+ return graph
+
+ def _pre_item_flatten(self, item):
+ """Called before a item is flattened; any pre-flattening actions."""
+ if id(item) in self._history:
+ raise ValueError("Already flattened item: %s (%s), recursive"
+ " flattening not supported" % (item, id(item)))
+ self._history.add(id(item))
+
+ def _post_item_flatten(self, item, graph):
+ """Called before a item is flattened; any post-flattening actions."""
+
+ def _pre_flatten(self):
+ """Called before the flattening of the item starts."""
+ self._history.clear()
+
+ def _post_flatten(self, graph):
+ """Called after the flattening of the item finishes successfully."""
+ dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
+ key=lambda node: node.name)
+ if dup_names:
+ dup_names = ', '.join(sorted(dup_names))
+ raise exc.Duplicate("Atoms with duplicate names "
+ "found: %s" % (dup_names))
+ self._history.clear()
+ # NOTE(harlowja): this one can be expensive to calculate (especially
+ # the cycle detection), so only do it if we know debugging is enabled
+ # and not under all cases.
+ if LOG.isEnabledFor(logging.DEBUG):
+ LOG.debug("Translated '%s' into a graph:", self._root)
+ for line in graph.pformat().splitlines():
+ # Indent it so that it's slightly offset from the above line.
+ LOG.debug(" %s", line)
+
+ def flatten(self):
+ """Flattens a item (a task or flow) into a single execution graph."""
+ if self._graph is not None:
+ return self._graph
+ self._pre_flatten()
+ graph = self._flatten(self._root)
+ self._post_flatten(graph)
+ self._graph = graph
+ if self._freeze:
+ self._graph.freeze()
+ return self._graph
diff --git a/taskflow/tests/unit/test_flattening.py b/taskflow/tests/unit/test_action_engine_compile.py
index 600a000..57c248e 100644
--- a/taskflow/tests/unit/test_flattening.py
+++ b/taskflow/tests/unit/test_action_engine_compile.py
@@ -24,7 +24,8 @@ from taskflow import retry
from taskflow import test
from taskflow.tests import utils as t_utils
-from taskflow.utils import flow_utils as f_utils
+
+from taskflow.engines.action_engine import compiler
def _make_many(amount):
@@ -35,24 +36,26 @@ def _make_many(amount):
return tasks
-class FlattenTest(test.TestCase):
- def test_flatten_task(self):
+class PatternCompileTest(test.TestCase):
+ def test_task(self):
task = t_utils.DummyTask(name='a')
- g = f_utils.flatten(task)
-
+ compilation = compiler.PatternCompiler().compile(task)
+ g = compilation.execution_graph
self.assertEqual(list(g.nodes()), [task])
self.assertEqual(list(g.edges()), [])
- def test_flatten_retry(self):
+ def test_retry(self):
r = retry.AlwaysRevert('r1')
msg_regex = "^Retry controller .* is used not as a flow parameter"
- self.assertRaisesRegexp(TypeError, msg_regex, f_utils.flatten, r)
+ self.assertRaisesRegexp(TypeError, msg_regex,
+ compiler.PatternCompiler().compile, r)
- def test_flatten_wrong_object(self):
+ def test_wrong_object(self):
msg_regex = '^Unknown type requested to flatten'
- self.assertRaisesRegexp(TypeError, msg_regex, f_utils.flatten, 42)
+ self.assertRaisesRegexp(TypeError, msg_regex,
+ compiler.PatternCompiler().compile, 42)
- def test_linear_flatten(self):
+ def test_linear(self):
a, b, c, d = _make_many(4)
flo = lf.Flow("test")
flo.add(a, b, c)
@@ -60,7 +63,8 @@ class FlattenTest(test.TestCase):
sflo.add(d)
flo.add(sflo)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
order = g.topological_sort()
@@ -71,18 +75,20 @@ class FlattenTest(test.TestCase):
self.assertEqual([d], list(g.no_successors_iter()))
self.assertEqual([a], list(g.no_predecessors_iter()))
- def test_invalid_flatten(self):
+ def test_invalid(self):
a, b, c = _make_many(3)
flo = lf.Flow("test")
flo.add(a, b, c)
flo.add(flo)
- self.assertRaises(ValueError, f_utils.flatten, flo)
+ self.assertRaises(ValueError,
+ compiler.PatternCompiler().compile, flo)
- def test_unordered_flatten(self):
+ def test_unordered(self):
a, b, c, d = _make_many(4)
flo = uf.Flow("test")
flo.add(a, b, c, d)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(0, g.number_of_edges())
self.assertEqual(set([a, b, c, d]),
@@ -90,14 +96,16 @@ class FlattenTest(test.TestCase):
self.assertEqual(set([a, b, c, d]),
set(g.no_predecessors_iter()))
- def test_linear_nested_flatten(self):
+ def test_linear_nested(self):
a, b, c, d = _make_many(4)
flo = lf.Flow("test")
flo.add(a, b)
flo2 = uf.Flow("test2")
flo2.add(c, d)
flo.add(flo2)
- g = f_utils.flatten(flo)
+
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
lb = g.subgraph([a, b])
@@ -112,7 +120,7 @@ class FlattenTest(test.TestCase):
self.assertTrue(g.has_edge(b, c))
self.assertTrue(g.has_edge(b, d))
- def test_unordered_nested_flatten(self):
+ def test_unordered_nested(self):
a, b, c, d = _make_many(4)
flo = uf.Flow("test")
flo.add(a, b)
@@ -120,7 +128,8 @@ class FlattenTest(test.TestCase):
flo2.add(c, d)
flo.add(flo2)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
for n in [a, b]:
self.assertFalse(g.has_edge(n, c))
@@ -134,14 +143,15 @@ class FlattenTest(test.TestCase):
lb = g.subgraph([c, d])
self.assertEqual(1, lb.number_of_edges())
- def test_unordered_nested_in_linear_flatten(self):
+ def test_unordered_nested_in_linear(self):
a, b, c, d = _make_many(4)
flo = lf.Flow('lt').add(
a,
uf.Flow('ut').add(b, c),
d)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(), [
(a, b),
@@ -150,16 +160,17 @@ class FlattenTest(test.TestCase):
(c, d)
])
- def test_graph_flatten(self):
+ def test_graph(self):
a, b, c, d = _make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertEqual(0, g.number_of_edges())
- def test_graph_flatten_nested(self):
+ def test_graph_nested(self):
a, b, c, d, e, f, g = _make_many(7)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -168,14 +179,15 @@ class FlattenTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
- graph = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ graph = compilation.execution_graph
self.assertEqual(7, len(graph))
self.assertItemsEqual(graph.edges(data=True), [
(e, f, {'invariant': True}),
(f, g, {'invariant': True})
])
- def test_graph_flatten_nested_graph(self):
+ def test_graph_nested_graph(self):
a, b, c, d, e, f, g = _make_many(7)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -184,11 +196,12 @@ class FlattenTest(test.TestCase):
flo2.add(e, f, g)
flo.add(flo2)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(7, len(g))
self.assertEqual(0, g.number_of_edges())
- def test_graph_flatten_links(self):
+ def test_graph_links(self):
a, b, c, d = _make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -196,7 +209,8 @@ class FlattenTest(test.TestCase):
flo.link(b, c)
flo.link(c, d)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, b, {'manual': True}),
@@ -206,12 +220,13 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([a], g.no_predecessors_iter())
self.assertItemsEqual([d], g.no_successors_iter())
- def test_graph_flatten_dependencies(self):
+ def test_graph_dependencies(self):
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
flo = gf.Flow("test").add(a, b)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, b, {'reasons': set(['x'])})
@@ -219,7 +234,7 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([a], g.no_predecessors_iter())
self.assertItemsEqual([b], g.no_successors_iter())
- def test_graph_flatten_nested_requires(self):
+ def test_graph_nested_requires(self):
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=[])
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
@@ -228,7 +243,8 @@ class FlattenTest(test.TestCase):
lf.Flow("test2").add(b, c)
)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, c, {'reasons': set(['x'])}),
@@ -237,7 +253,7 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([a, b], g.no_predecessors_iter())
self.assertItemsEqual([c], g.no_successors_iter())
- def test_graph_flatten_nested_provides(self):
+ def test_graph_nested_provides(self):
a = t_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
b = t_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=[])
@@ -246,7 +262,8 @@ class FlattenTest(test.TestCase):
lf.Flow("test2").add(b, c)
)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(b, c, {'invariant': True}),
@@ -255,46 +272,50 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([b], g.no_predecessors_iter())
self.assertItemsEqual([a, c], g.no_successors_iter())
- def test_flatten_checks_for_dups(self):
+ def test_checks_for_dups(self):
flo = gf.Flow("test").add(
t_utils.DummyTask(name="a"),
t_utils.DummyTask(name="a")
)
self.assertRaisesRegexp(exc.Duplicate,
- '^Tasks with duplicate names',
- f_utils.flatten, flo)
+ '^Atoms with duplicate names',
+ compiler.PatternCompiler().compile, flo)
- def test_flatten_checks_for_dups_globally(self):
+ def test_checks_for_dups_globally(self):
flo = gf.Flow("test").add(
gf.Flow("int1").add(t_utils.DummyTask(name="a")),
gf.Flow("int2").add(t_utils.DummyTask(name="a")))
self.assertRaisesRegexp(exc.Duplicate,
- '^Tasks with duplicate names',
- f_utils.flatten, flo)
+ '^Atoms with duplicate names',
+ compiler.PatternCompiler().compile, flo)
- def test_flatten_retry_in_linear_flow(self):
+ def test_retry_in_linear_flow(self):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
- def test_flatten_retry_in_unordered_flow(self):
+ def test_retry_in_unordered_flow(self):
flo = uf.Flow("test", retry.AlwaysRevert("c"))
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
- def test_flatten_retry_in_graph_flow(self):
+ def test_retry_in_graph_flow(self):
flo = gf.Flow("test", retry.AlwaysRevert("c"))
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(1, len(g))
self.assertEqual(0, g.number_of_edges())
- def test_flatten_retry_in_nested_flows(self):
+ def test_retry_in_nested_flows(self):
c1 = retry.AlwaysRevert("c1")
c2 = retry.AlwaysRevert("c2")
flo = lf.Flow("test", c1).add(lf.Flow("test2", c2))
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(2, len(g))
self.assertItemsEqual(g.edges(data=True), [
@@ -304,11 +325,13 @@ class FlattenTest(test.TestCase):
self.assertItemsEqual([c1], g.no_predecessors_iter())
self.assertItemsEqual([c2], g.no_successors_iter())
- def test_flatten_retry_in_linear_flow_with_tasks(self):
+ def test_retry_in_linear_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
flo = lf.Flow("test", c).add(a, b)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
+
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(a, b, {'invariant': True}),
@@ -320,11 +343,13 @@ class FlattenTest(test.TestCase):
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
- def test_flatten_retry_in_unordered_flow_with_tasks(self):
+ def test_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
flo = uf.Flow("test", c).add(a, b)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
+
self.assertEqual(3, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c, a, {'retry': True}),
@@ -336,11 +361,12 @@ class FlattenTest(test.TestCase):
self.assertIs(c, g.node[a]['retry'])
self.assertIs(c, g.node[b]['retry'])
- def test_flatten_retry_in_graph_flow_with_tasks(self):
+ def test_retry_in_graph_flow_with_tasks(self):
r = retry.AlwaysRevert("cp")
a, b, c = _make_many(3)
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
self.assertEqual(4, len(g))
self.assertItemsEqual(g.edges(data=True), [
@@ -355,7 +381,7 @@ class FlattenTest(test.TestCase):
self.assertIs(r, g.node[b]['retry'])
self.assertIs(r, g.node[c]['retry'])
- def test_flatten_retries_hierarchy(self):
+ def test_retries_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
a, b, c, d = _make_many(4)
@@ -363,7 +389,9 @@ class FlattenTest(test.TestCase):
a,
lf.Flow("test", c2).add(b, c),
d)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
+
self.assertEqual(6, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c1, a, {'retry': True}),
@@ -379,14 +407,16 @@ class FlattenTest(test.TestCase):
self.assertIs(c1, g.node[c2]['retry'])
self.assertIs(None, g.node[c1].get('retry'))
- def test_flatten_retry_subflows_hierarchy(self):
+ def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
a, b, c, d = _make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test").add(b, c),
d)
- g = f_utils.flatten(flo)
+ compilation = compiler.PatternCompiler().compile(flo)
+ g = compilation.execution_graph
+
self.assertEqual(5, len(g))
self.assertItemsEqual(g.edges(data=True), [
(c1, a, {'retry': True}),
diff --git a/taskflow/utils/flow_utils.py b/taskflow/utils/flow_utils.py
deleted file mode 100644
index 6b54d56..0000000
--- a/taskflow/utils/flow_utils.py
+++ /dev/null
@@ -1,180 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from taskflow import exceptions
-from taskflow import flow
-from taskflow import retry
-from taskflow import task
-from taskflow.types import graph as gr
-from taskflow.utils import misc
-
-
-LOG = logging.getLogger(__name__)
-
-
-RETRY_EDGE_DATA = {
- 'retry': True,
-}
-
-
-class Flattener(object):
- def __init__(self, root, freeze=True):
- self._root = root
- self._graph = None
- self._history = set()
- self._freeze = bool(freeze)
-
- def _add_new_edges(self, graph, nodes_from, nodes_to, edge_attrs):
- """Adds new edges from nodes to other nodes in the specified graph,
- with the following edge attributes (defaulting to the class provided
- edge_data if None), if the edge does not already exist.
- """
- nodes_to = list(nodes_to)
- for u in nodes_from:
- for v in nodes_to:
- if not graph.has_edge(u, v):
- # NOTE(harlowja): give each edge its own attr copy so that
- # if it's later modified that the same copy isn't modified.
- graph.add_edge(u, v, attr_dict=edge_attrs.copy())
-
- def _flatten(self, item):
- functor = self._find_flattener(item)
- if not functor:
- raise TypeError("Unknown type requested to flatten: %s (%s)"
- % (item, type(item)))
- self._pre_item_flatten(item)
- graph = functor(item)
- self._post_item_flatten(item, graph)
- return graph
-
- def _find_flattener(self, item):
- """Locates the flattening function to use to flatten the given item."""
- if isinstance(item, flow.Flow):
- return self._flatten_flow
- elif isinstance(item, task.BaseTask):
- return self._flatten_task
- elif isinstance(item, retry.Retry):
- raise TypeError("Retry controller %s (%s) is used not as a flow "
- "parameter" % (item, type(item)))
- else:
- return None
-
- def _connect_retry(self, retry, graph):
- graph.add_node(retry)
-
- # All graph nodes that have no predecessors should depend on its retry
- nodes_to = [n for n in graph.no_predecessors_iter() if n != retry]
- self._add_new_edges(graph, [retry], nodes_to, RETRY_EDGE_DATA)
-
- # Add link to retry for each node of subgraph that hasn't
- # a parent retry
- for n in graph.nodes_iter():
- if n != retry and 'retry' not in graph.node[n]:
- graph.node[n]['retry'] = retry
-
- def _flatten_task(self, task):
- """Flattens a individual task."""
- graph = gr.DiGraph(name=task.name)
- graph.add_node(task)
- return graph
-
- def _flatten_flow(self, flow):
- """Flattens a graph flow."""
- graph = gr.DiGraph(name=flow.name)
-
- # Flatten all nodes into a single subgraph per node.
- subgraph_map = {}
- for item in flow:
- subgraph = self._flatten(item)
- subgraph_map[item] = subgraph
- graph = gr.merge_graphs([graph, subgraph])
-
- # Reconnect all node edges to their corresponding subgraphs.
- for (u, v, attrs) in flow.iter_links():
- u_g = subgraph_map[u]
- v_g = subgraph_map[v]
- if any(attrs.get(k) for k in ('invariant', 'manual', 'retry')):
- # Connect nodes with no predecessors in v to nodes with
- # no successors in u (thus maintaining the edge dependency).
- self._add_new_edges(graph,
- u_g.no_successors_iter(),
- v_g.no_predecessors_iter(),
- edge_attrs=attrs)
- else:
- # This is dependency-only edge, connect corresponding
- # providers and consumers.
- for provider in u_g:
- for consumer in v_g:
- reasons = provider.provides & consumer.requires
- if reasons:
- graph.add_edge(provider, consumer, reasons=reasons)
-
- if flow.retry is not None:
- self._connect_retry(flow.retry, graph)
- return graph
-
- def _pre_item_flatten(self, item):
- """Called before a item is flattened; any pre-flattening actions."""
- if id(item) in self._history:
- raise ValueError("Already flattened item: %s (%s), recursive"
- " flattening not supported" % (item, id(item)))
- LOG.debug("Starting to flatten '%s'", item)
- self._history.add(id(item))
-
- def _post_item_flatten(self, item, graph):
- """Called before a item is flattened; any post-flattening actions."""
- LOG.debug("Finished flattening '%s'", item)
- # NOTE(harlowja): this one can be expensive to calculate (especially
- # the cycle detection), so only do it if we know debugging is enabled
- # and not under all cases.
- if LOG.isEnabledFor(logging.DEBUG):
- LOG.debug("Translated '%s' into a graph:", item)
- for line in graph.pformat().splitlines():
- # Indent it so that it's slightly offset from the above line.
- LOG.debug(" %s", line)
-
- def _pre_flatten(self):
- """Called before the flattening of the item starts."""
- self._history.clear()
-
- def _post_flatten(self, graph):
- """Called after the flattening of the item finishes successfully."""
- dup_names = misc.get_duplicate_keys(graph.nodes_iter(),
- key=lambda node: node.name)
- if dup_names:
- dup_names = ', '.join(sorted(dup_names))
- raise exceptions.Duplicate("Tasks with duplicate names "
- "found: %s" % (dup_names))
- self._history.clear()
-
- def flatten(self):
- """Flattens a item (a task or flow) into a single execution graph."""
- if self._graph is not None:
- return self._graph
- self._pre_flatten()
- graph = self._flatten(self._root)
- self._post_flatten(graph)
- self._graph = graph
- if self._freeze:
- self._graph.freeze()
- return self._graph
-
-
-def flatten(item, freeze=True):
- """Flattens a item (a task or flow) into a single execution graph."""
- return Flattener(item, freeze=freeze).flatten()