diff options
Diffstat (limited to 'taskflow/engines/action_engine/analyzer.py')
-rw-r--r-- | taskflow/engines/action_engine/analyzer.py | 160 |
1 files changed, 97 insertions, 63 deletions
diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index 77f7df3..bdde897 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -18,10 +18,31 @@ import abc import itertools import weakref -from networkx.algorithms import traversal import six +from taskflow.engines.action_engine import compiler as co from taskflow import states as st +from taskflow.utils import iter_utils + + +def _depth_first_iterate(graph, connected_to_functors, initial_nodes_iter): + """Iterates connected nodes in execution graph (from starting set). + + Jumps over nodes with ``noop`` attribute (does not yield them back). + """ + stack = list(initial_nodes_iter) + while stack: + node = stack.pop() + node_attrs = graph.node[node] + if not node_attrs.get('noop'): + yield node + try: + node_kind = node_attrs['kind'] + connected_to_functor = connected_to_functors[node_kind] + except KeyError: + pass + else: + stack.extend(connected_to_functor(node)) @six.add_metaclass(abc.ABCMeta) @@ -74,8 +95,8 @@ class IgnoreDecider(Decider): state to ``IGNORE`` so that they are ignored in future runtime activities. """ - successors_iter = runtime.analyzer.iterate_subgraph(self._atom) - runtime.reset_nodes(itertools.chain([self._atom], successors_iter), + successors_iter = runtime.analyzer.iterate_connected_atoms(self._atom) + runtime.reset_atoms(itertools.chain([self._atom], successors_iter), state=st.IGNORE, intention=st.IGNORE) @@ -105,66 +126,67 @@ class Analyzer(object): self._storage = runtime.storage self._execution_graph = runtime.compilation.execution_graph - def get_next_nodes(self, node=None): - """Get next nodes to run (originating from node or all nodes).""" - if node is None: - execute = self.browse_nodes_for_execute() - revert = self.browse_nodes_for_revert() - return execute + revert - state = self.get_state(node) - intention = self._storage.get_atom_intention(node.name) + def iter_next_atoms(self, atom=None): + """Iterate next atoms to run (originating from atom or all atoms).""" + if atom is None: + return iter_utils.unique_seen(self.browse_atoms_for_execute(), + self.browse_atoms_for_revert()) + state = self.get_state(atom) + intention = self._storage.get_atom_intention(atom.name) if state == st.SUCCESS: if intention == st.REVERT: - return [ - (node, NoOpDecider()), - ] + return iter([ + (atom, NoOpDecider()), + ]) elif intention == st.EXECUTE: - return self.browse_nodes_for_execute(node) + return self.browse_atoms_for_execute(atom=atom) else: - return [] + return iter([]) elif state == st.REVERTED: - return self.browse_nodes_for_revert(node) + return self.browse_atoms_for_revert(atom=atom) elif state == st.FAILURE: - return self.browse_nodes_for_revert() + return self.browse_atoms_for_revert() else: - return [] + return iter([]) - def browse_nodes_for_execute(self, node=None): - """Browse next nodes to execute. + def browse_atoms_for_execute(self, atom=None): + """Browse next atoms to execute. - This returns a collection of nodes that *may* be ready to be - executed, if given a specific node it will only examine the successors - of that node, otherwise it will examine the whole graph. + This returns a iterator of atoms that *may* be ready to be + executed, if given a specific atom, it will only examine the successors + of that atom, otherwise it will examine the whole graph. """ - if node is not None: - nodes = self._execution_graph.successors(node) + if atom is None: + atom_it = self.iterate_nodes(co.ATOMS) else: - nodes = self._execution_graph.nodes_iter() - ready_nodes = [] - for node in nodes: - is_ready, late_decider = self._get_maybe_ready_for_execute(node) + successors_iter = self._execution_graph.successors_iter + atom_it = _depth_first_iterate(self._execution_graph, + {co.FLOW: successors_iter}, + successors_iter(atom)) + for atom in atom_it: + is_ready, late_decider = self._get_maybe_ready_for_execute(atom) if is_ready: - ready_nodes.append((node, late_decider)) - return ready_nodes + yield (atom, late_decider) - def browse_nodes_for_revert(self, node=None): - """Browse next nodes to revert. + def browse_atoms_for_revert(self, atom=None): + """Browse next atoms to revert. - This returns a collection of nodes that *may* be ready to be be - reverted, if given a specific node it will only examine the - predecessors of that node, otherwise it will examine the whole + This returns a iterator of atoms that *may* be ready to be be + reverted, if given a specific atom it will only examine the + predecessors of that atom, otherwise it will examine the whole graph. """ - if node is not None: - nodes = self._execution_graph.predecessors(node) + if atom is None: + atom_it = self.iterate_nodes(co.ATOMS) else: - nodes = self._execution_graph.nodes_iter() - ready_nodes = [] - for node in nodes: - is_ready, late_decider = self._get_maybe_ready_for_revert(node) + predecessors_iter = self._execution_graph.predecessors_iter + atom_it = _depth_first_iterate(self._execution_graph, + {co.FLOW: predecessors_iter}, + predecessors_iter(atom)) + for atom in atom_it: + is_ready, late_decider = self._get_maybe_ready_for_revert(atom) if is_ready: - ready_nodes.append((node, late_decider)) - return ready_nodes + yield (atom, late_decider) def _get_maybe_ready(self, atom, transition_to, allowed_intentions, connected_fetcher, connected_checker, @@ -187,59 +209,71 @@ class Analyzer(object): def _get_maybe_ready_for_execute(self, atom): """Returns if an atom is *likely* ready to be executed.""" - def decider_fetcher(atom): edge_deciders = self._runtime.fetch_edge_deciders(atom) if edge_deciders: return IgnoreDecider(atom, edge_deciders) else: return NoOpDecider() - + predecessors_iter = self._execution_graph.predecessors_iter + connected_fetcher = lambda atom: \ + _depth_first_iterate(self._execution_graph, + {co.FLOW: predecessors_iter}, + predecessors_iter(atom)) connected_checker = lambda connected_iter: \ all(state == st.SUCCESS and intention == st.EXECUTE for state, intention in connected_iter) - connected_fetcher = self._execution_graph.predecessors_iter return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE], connected_fetcher, connected_checker, decider_fetcher) def _get_maybe_ready_for_revert(self, atom): """Returns if an atom is *likely* ready to be reverted.""" + successors_iter = self._execution_graph.successors_iter + connected_fetcher = lambda atom: \ + _depth_first_iterate(self._execution_graph, + {co.FLOW: successors_iter}, + successors_iter(atom)) connected_checker = lambda connected_iter: \ all(state in (st.PENDING, st.REVERTED) for state, _intention in connected_iter) decider_fetcher = lambda atom: NoOpDecider() - connected_fetcher = self._execution_graph.successors_iter return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY], connected_fetcher, connected_checker, decider_fetcher) - def iterate_subgraph(self, atom): - """Iterates a subgraph connected to given atom.""" - for _src, dst in traversal.dfs_edges(self._execution_graph, atom): - yield dst + def iterate_connected_atoms(self, atom): + """Iterates **all** successor atoms connected to given atom.""" + successors_iter = self._execution_graph.successors_iter + return _depth_first_iterate( + self._execution_graph, { + co.FLOW: successors_iter, + co.TASK: successors_iter, + co.RETRY: successors_iter, + }, successors_iter(atom)) def iterate_retries(self, state=None): """Iterates retry atoms that match the provided state. If no state is provided it will yield back all retry atoms. """ - for atom in self._runtime.fetch_atoms_by_kind('retry'): + for atom in self.iterate_nodes((co.RETRY,)): if not state or self.get_state(atom) == state: yield atom - def iterate_all_nodes(self): - """Yields back all nodes in the execution graph.""" - for node in self._execution_graph.nodes_iter(): - yield node + def iterate_nodes(self, allowed_kinds): + """Yields back all nodes of specified kinds in the execution graph.""" + for node, node_data in self._execution_graph.nodes_iter(data=True): + if node_data['kind'] in allowed_kinds: + yield node - def find_atom_retry(self, atom): - """Returns the retry atom associated to the given atom (or none).""" - return self._execution_graph.node[atom].get('retry') + def find_retry(self, node): + """Returns the retry atom associated to the given node (or none).""" + return self._execution_graph.node[node].get(co.RETRY) def is_success(self): - """Checks if all nodes in the execution graph are in 'happy' state.""" - for atom in self.iterate_all_nodes(): + """Checks if all atoms in the execution graph are in 'happy' state.""" + for atom in self.iterate_nodes(co.ATOMS): atom_state = self.get_state(atom) if atom_state == st.IGNORE: continue |