summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/analyzer.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine/analyzer.py')
-rw-r--r--taskflow/engines/action_engine/analyzer.py160
1 files changed, 97 insertions, 63 deletions
diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py
index 77f7df3..bdde897 100644
--- a/taskflow/engines/action_engine/analyzer.py
+++ b/taskflow/engines/action_engine/analyzer.py
@@ -18,10 +18,31 @@ import abc
import itertools
import weakref
-from networkx.algorithms import traversal
import six
+from taskflow.engines.action_engine import compiler as co
from taskflow import states as st
+from taskflow.utils import iter_utils
+
+
+def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter):
+ """Iterates connected nodes in execution graph (from starting set).
+
+ Jumps over nodes with ``noop`` attribute (does not yield them back).
+ """
+ stack = list(initial_nodes_iter)
+ while stack:
+ node = stack.pop()
+ node_attrs = graph.node[node]
+ if not node_attrs.get('noop'):
+ yield node
+ try:
+ node_kind = node_attrs['kind']
+ connected_to_functor = connected_to_functors[node_kind]
+ except KeyError:
+ pass
+ else:
+ stack.extend(connected_to_functor(node))
@six.add_metaclass(abc.ABCMeta)
@@ -74,8 +95,8 @@ class IgnoreDecider(Decider):
state to ``IGNORE`` so that they are ignored in future runtime
activities.
"""
- successors_iter = runtime.analyzer.iterate_subgraph(self._atom)
- runtime.reset_nodes(itertools.chain([self._atom], successors_iter),
+ successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom)
+ runtime.reset_atoms(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
@@ -105,66 +126,67 @@ class Analyzer(object):
self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph
- def get_next_nodes(self, node=None):
- """Get next nodes to run (originating from node or all nodes)."""
- if node is None:
- execute = self.browse_nodes_for_execute()
- revert = self.browse_nodes_for_revert()
- return execute + revert
- state = self.get_state(node)
- intention = self._storage.get_atom_intention(node.name)
+ def iter_next_atoms(self, atom=None):
+ """Iterate next atoms to run (originating from atom or all atoms)."""
+ if atom is None:
+ return iter_utils.unique_seen(self.browse_atoms_for_execute(),
+ self.browse_atoms_for_revert())
+ state = self.get_state(atom)
+ intention = self._storage.get_atom_intention(atom.name)
if state == st.SUCCESS:
if intention == st.REVERT:
- return [
- (node, NoOpDecider()),
- ]
+ return iter([
+ (atom, NoOpDecider()),
+ ])
elif intention == st.EXECUTE:
- return self.browse_nodes_for_execute(node)
+ return self.browse_atoms_for_execute(atom=atom)
else:
- return []
+ return iter([])
elif state == st.REVERTED:
- return self.browse_nodes_for_revert(node)
+ return self.browse_atoms_for_revert(atom=atom)
elif state == st.FAILURE:
- return self.browse_nodes_for_revert()
+ return self.browse_atoms_for_revert()
else:
- return []
+ return iter([])
- def browse_nodes_for_execute(self, node=None):
- """Browse next nodes to execute.
+ def browse_atoms_for_execute(self, atom=None):
+ """Browse next atoms to execute.
- This returns a collection of nodes that *may* be ready to be
- executed, if given a specific node it will only examine the successors
- of that node, otherwise it will examine the whole graph.
+ This returns a iterator of atoms that *may* be ready to be
+ executed, if given a specific atom, it will only examine the successors
+ of that atom, otherwise it will examine the whole graph.
"""
- if node is not None:
- nodes = self._execution_graph.successors(node)
+ if atom is None:
+ atom_it = self.iterate_nodes(co.ATOMS)
else:
- nodes = self._execution_graph.nodes_iter()
- ready_nodes = []
- for node in nodes:
- is_ready, late_decider = self._get_maybe_ready_for_execute(node)
+ successors_iter = self._execution_graph.successors_iter
+ atom_it = _depth_first_iterate(self._execution_graph,
+ {co.FLOW: successors_iter},
+ successors_iter(atom))
+ for atom in atom_it:
+ is_ready, late_decider = self._get_maybe_ready_for_execute(atom)
if is_ready:
- ready_nodes.append((node, late_decider))
- return ready_nodes
+ yield (atom, late_decider)
- def browse_nodes_for_revert(self, node=None):
- """Browse next nodes to revert.
+ def browse_atoms_for_revert(self, atom=None):
+ """Browse next atoms to revert.
- This returns a collection of nodes that *may* be ready to be be
- reverted, if given a specific node it will only examine the
- predecessors of that node, otherwise it will examine the whole
+ This returns a iterator of atoms that *may* be ready to be be
+ reverted, if given a specific atom it will only examine the
+ predecessors of that atom, otherwise it will examine the whole
graph.
"""
- if node is not None:
- nodes = self._execution_graph.predecessors(node)
+ if atom is None:
+ atom_it = self.iterate_nodes(co.ATOMS)
else:
- nodes = self._execution_graph.nodes_iter()
- ready_nodes = []
- for node in nodes:
- is_ready, late_decider = self._get_maybe_ready_for_revert(node)
+ predecessors_iter = self._execution_graph.predecessors_iter
+ atom_it = _depth_first_iterate(self._execution_graph,
+ {co.FLOW: predecessors_iter},
+ predecessors_iter(atom))
+ for atom in atom_it:
+ is_ready, late_decider = self._get_maybe_ready_for_revert(atom)
if is_ready:
- ready_nodes.append((node, late_decider))
- return ready_nodes
+ yield (atom, late_decider)
def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
connected_fetcher, connected_checker,
@@ -187,59 +209,71 @@ class Analyzer(object):
def _get_maybe_ready_for_execute(self, atom):
"""Returns if an atom is *likely* ready to be executed."""
-
def decider_fetcher(atom):
edge_deciders = self._runtime.fetch_edge_deciders(atom)
if edge_deciders:
return IgnoreDecider(atom, edge_deciders)
else:
return NoOpDecider()
-
+ predecessors_iter = self._execution_graph.predecessors_iter
+ connected_fetcher = lambda atom: \
+ _depth_first_iterate(self._execution_graph,
+ {co.FLOW: predecessors_iter},
+ predecessors_iter(atom))
connected_checker = lambda connected_iter: \
all(state == st.SUCCESS and intention == st.EXECUTE
for state, intention in connected_iter)
- connected_fetcher = self._execution_graph.predecessors_iter
return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE],
connected_fetcher, connected_checker,
decider_fetcher)
def _get_maybe_ready_for_revert(self, atom):
"""Returns if an atom is *likely* ready to be reverted."""
+ successors_iter = self._execution_graph.successors_iter
+ connected_fetcher = lambda atom: \
+ _depth_first_iterate(self._execution_graph,
+ {co.FLOW: successors_iter},
+ successors_iter(atom))
connected_checker = lambda connected_iter: \
all(state in (st.PENDING, st.REVERTED)
for state, _intention in connected_iter)
decider_fetcher = lambda atom: NoOpDecider()
- connected_fetcher = self._execution_graph.successors_iter
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, connected_checker,
decider_fetcher)
- def iterate_subgraph(self, atom):
- """Iterates a subgraph connected to given atom."""
- for _src, dst in traversal.dfs_edges(self._execution_graph, atom):
- yield dst
+ def iterate_connected_atoms(self, atom):
+ """Iterates **all** successor atoms connected to given atom."""
+ successors_iter = self._execution_graph.successors_iter
+ return _depth_first_iterate(
+ self._execution_graph, {
+ co.FLOW: successors_iter,
+ co.TASK: successors_iter,
+ co.RETRY: successors_iter,
+ }, successors_iter(atom))
def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state.
If no state is provided it will yield back all retry atoms.
"""
- for atom in self._runtime.fetch_atoms_by_kind('retry'):
+ for atom in self.iterate_nodes((co.RETRY,)):
if not state or self.get_state(atom) == state:
yield atom
- def iterate_all_nodes(self):
- """Yields back all nodes in the execution graph."""
- for node in self._execution_graph.nodes_iter():
- yield node
+ def iterate_nodes(self, allowed_kinds):
+ """Yields back all nodes of specified kinds in the execution graph."""
+ for node, node_data in self._execution_graph.nodes_iter(data=True):
+ if node_data['kind'] in allowed_kinds:
+ yield node
- def find_atom_retry(self, atom):
- """Returns the retry atom associated to the given atom (or none)."""
- return self._execution_graph.node[atom].get('retry')
+ def find_retry(self, node):
+ """Returns the retry atom associated to the given node (or none)."""
+ return self._execution_graph.node[node].get(co.RETRY)
def is_success(self):
- """Checks if all nodes in the execution graph are in 'happy' state."""
- for atom in self.iterate_all_nodes():
+ """Checks if all atoms in the execution graph are in 'happy' state."""
+ for atom in self.iterate_nodes(co.ATOMS):
atom_state = self.get_state(atom)
if atom_state == st.IGNORE:
continue