summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/analyzer.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-09-04 13:14:25 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-10-01 22:38:30 -0700
commit79d25e69e8300db5debdfd717ffd80f91c246c10 (patch)
tree134e9764a021dc190a1b158fb27f222d9304908e /taskflow/engines/action_engine/analyzer.py
parentba4704cd18ab6d799a2de59bdf0feab9b5430a30 (diff)
downloadtaskflow-1.22.0.tar.gz
Simplify flow action engine compilation1.22.0
Instead of the added complexity of discarding flow nodes we can simplify the compilation process by just retaining them and jumping over them in further iteration and graph and tree runtime usage. This change moves toward a model that does just this, which makes it also easier to in the future use the newly added flow graph nodes to do meaningful things (like use them as a point to change which flow_detail is used). Change-Id: Icb1695f4b995a0392f940837514774768f222db4
Diffstat (limited to 'taskflow/engines/action_engine/analyzer.py')
-rw-r--r--taskflow/engines/action_engine/analyzer.py160
1 files changed, 97 insertions, 63 deletions
diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py
index 77f7df3..bdde897 100644
--- a/taskflow/engines/action_engine/analyzer.py
+++ b/taskflow/engines/action_engine/analyzer.py
@@ -18,10 +18,31 @@ import abc
import itertools
import weakref
-from networkx.algorithms import traversal
import six
+from taskflow.engines.action_engine import compiler as co
from taskflow import states as st
+from taskflow.utils import iter_utils
+
+
+def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter):
+ """Iterates connected nodes in execution graph (from starting set).
+
+ Jumps over nodes with ``noop`` attribute (does not yield them back).
+ """
+ stack = list(initial_nodes_iter)
+ while stack:
+ node = stack.pop()
+ node_attrs = graph.node[node]
+ if not node_attrs.get('noop'):
+ yield node
+ try:
+ node_kind = node_attrs['kind']
+ connected_to_functor = connected_to_functors[node_kind]
+ except KeyError:
+ pass
+ else:
+ stack.extend(connected_to_functor(node))
@six.add_metaclass(abc.ABCMeta)
@@ -74,8 +95,8 @@ class IgnoreDecider(Decider):
state to ``IGNORE`` so that they are ignored in future runtime
activities.
"""
- successors_iter = runtime.analyzer.iterate_subgraph(self._atom)
- runtime.reset_nodes(itertools.chain([self._atom], successors_iter),
+ successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom)
+ runtime.reset_atoms(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
@@ -105,66 +126,67 @@ class Analyzer(object):
self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph
- def get_next_nodes(self, node=None):
- """Get next nodes to run (originating from node or all nodes)."""
- if node is None:
- execute = self.browse_nodes_for_execute()
- revert = self.browse_nodes_for_revert()
- return execute + revert
- state = self.get_state(node)
- intention = self._storage.get_atom_intention(node.name)
+ def iter_next_atoms(self, atom=None):
+ """Iterate next atoms to run (originating from atom or all atoms)."""
+ if atom is None:
+ return iter_utils.unique_seen(self.browse_atoms_for_execute(),
+ self.browse_atoms_for_revert())
+ state = self.get_state(atom)
+ intention = self._storage.get_atom_intention(atom.name)
if state == st.SUCCESS:
if intention == st.REVERT:
- return [
- (node, NoOpDecider()),
- ]
+ return iter([
+ (atom, NoOpDecider()),
+ ])
elif intention == st.EXECUTE:
- return self.browse_nodes_for_execute(node)
+ return self.browse_atoms_for_execute(atom=atom)
else:
- return []
+ return iter([])
elif state == st.REVERTED:
- return self.browse_nodes_for_revert(node)
+ return self.browse_atoms_for_revert(atom=atom)
elif state == st.FAILURE:
- return self.browse_nodes_for_revert()
+ return self.browse_atoms_for_revert()
else:
- return []
+ return iter([])
- def browse_nodes_for_execute(self, node=None):
- """Browse next nodes to execute.
+ def browse_atoms_for_execute(self, atom=None):
+ """Browse next atoms to execute.
- This returns a collection of nodes that *may* be ready to be
- executed, if given a specific node it will only examine the successors
- of that node, otherwise it will examine the whole graph.
+ This returns a iterator of atoms that *may* be ready to be
+ executed, if given a specific atom, it will only examine the successors
+ of that atom, otherwise it will examine the whole graph.
"""
- if node is not None:
- nodes = self._execution_graph.successors(node)
+ if atom is None:
+ atom_it = self.iterate_nodes(co.ATOMS)
else:
- nodes = self._execution_graph.nodes_iter()
- ready_nodes = []
- for node in nodes:
- is_ready, late_decider = self._get_maybe_ready_for_execute(node)
+ successors_iter = self._execution_graph.successors_iter
+ atom_it = _depth_first_iterate(self._execution_graph,
+ {co.FLOW: successors_iter},
+ successors_iter(atom))
+ for atom in atom_it:
+ is_ready, late_decider = self._get_maybe_ready_for_execute(atom)
if is_ready:
- ready_nodes.append((node, late_decider))
- return ready_nodes
+ yield (atom, late_decider)
- def browse_nodes_for_revert(self, node=None):
- """Browse next nodes to revert.
+ def browse_atoms_for_revert(self, atom=None):
+ """Browse next atoms to revert.
- This returns a collection of nodes that *may* be ready to be be
- reverted, if given a specific node it will only examine the
- predecessors of that node, otherwise it will examine the whole
+ This returns a iterator of atoms that *may* be ready to be be
+ reverted, if given a specific atom it will only examine the
+ predecessors of that atom, otherwise it will examine the whole
graph.
"""
- if node is not None:
- nodes = self._execution_graph.predecessors(node)
+ if atom is None:
+ atom_it = self.iterate_nodes(co.ATOMS)
else:
- nodes = self._execution_graph.nodes_iter()
- ready_nodes = []
- for node in nodes:
- is_ready, late_decider = self._get_maybe_ready_for_revert(node)
+ predecessors_iter = self._execution_graph.predecessors_iter
+ atom_it = _depth_first_iterate(self._execution_graph,
+ {co.FLOW: predecessors_iter},
+ predecessors_iter(atom))
+ for atom in atom_it:
+ is_ready, late_decider = self._get_maybe_ready_for_revert(atom)
if is_ready:
- ready_nodes.append((node, late_decider))
- return ready_nodes
+ yield (atom, late_decider)
def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
connected_fetcher, connected_checker,
@@ -187,59 +209,71 @@ class Analyzer(object):
def _get_maybe_ready_for_execute(self, atom):
"""Returns if an atom is *likely* ready to be executed."""
-
def decider_fetcher(atom):
edge_deciders = self._runtime.fetch_edge_deciders(atom)
if edge_deciders:
return IgnoreDecider(atom, edge_deciders)
else:
return NoOpDecider()
-
+ predecessors_iter = self._execution_graph.predecessors_iter
+ connected_fetcher = lambda atom: \
+ _depth_first_iterate(self._execution_graph,
+ {co.FLOW: predecessors_iter},
+ predecessors_iter(atom))
connected_checker = lambda connected_iter: \
all(state == st.SUCCESS and intention == st.EXECUTE
for state, intention in connected_iter)
- connected_fetcher = self._execution_graph.predecessors_iter
return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE],
connected_fetcher, connected_checker,
decider_fetcher)
def _get_maybe_ready_for_revert(self, atom):
"""Returns if an atom is *likely* ready to be reverted."""
+ successors_iter = self._execution_graph.successors_iter
+ connected_fetcher = lambda atom: \
+ _depth_first_iterate(self._execution_graph,
+ {co.FLOW: successors_iter},
+ successors_iter(atom))
connected_checker = lambda connected_iter: \
all(state in (st.PENDING, st.REVERTED)
for state, _intention in connected_iter)
decider_fetcher = lambda atom: NoOpDecider()
- connected_fetcher = self._execution_graph.successors_iter
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, connected_checker,
decider_fetcher)
- def iterate_subgraph(self, atom):
- """Iterates a subgraph connected to given atom."""
- for _src, dst in traversal.dfs_edges(self._execution_graph, atom):
- yield dst
+ def iterate_connected_atoms(self, atom):
+ """Iterates **all** successor atoms connected to given atom."""
+ successors_iter = self._execution_graph.successors_iter
+ return _depth_first_iterate(
+ self._execution_graph, {
+ co.FLOW: successors_iter,
+ co.TASK: successors_iter,
+ co.RETRY: successors_iter,
+ }, successors_iter(atom))
def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state.
If no state is provided it will yield back all retry atoms.
"""
- for atom in self._runtime.fetch_atoms_by_kind('retry'):
+ for atom in self.iterate_nodes((co.RETRY,)):
if not state or self.get_state(atom) == state:
yield atom
- def iterate_all_nodes(self):
- """Yields back all nodes in the execution graph."""
- for node in self._execution_graph.nodes_iter():
- yield node
+ def iterate_nodes(self, allowed_kinds):
+ """Yields back all nodes of specified kinds in the execution graph."""
+ for node, node_data in self._execution_graph.nodes_iter(data=True):
+ if node_data['kind'] in allowed_kinds:
+ yield node
- def find_atom_retry(self, atom):
- """Returns the retry atom associated to the given atom (or none)."""
- return self._execution_graph.node[atom].get('retry')
+ def find_retry(self, node):
+ """Returns the retry atom associated to the given node (or none)."""
+ return self._execution_graph.node[node].get(co.RETRY)
def is_success(self):
- """Checks if all nodes in the execution graph are in 'happy' state."""
- for atom in self.iterate_all_nodes():
+ """Checks if all atoms in the execution graph are in 'happy' state."""
+ for atom in self.iterate_nodes(co.ATOMS):
atom_state = self.get_state(atom)
if atom_state == st.IGNORE:
continue