diff options
Diffstat (limited to 'taskflow/engines/action_engine/completer.py')
-rw-r--r-- | taskflow/engines/action_engine/completer.py | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 0ab727a..e3ab54d 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -20,6 +20,7 @@ import weakref from oslo_utils import reflection import six +from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom @@ -62,7 +63,7 @@ class RevertAndRetry(Strategy): self._retry = retry def apply(self): - tweaked = self._runtime.reset_nodes([self._retry], state=None, + tweaked = self._runtime.reset_atoms([self._retry], state=None, intention=st.RETRY) tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None, intention=st.REVERT)) @@ -79,8 +80,9 @@ class RevertAll(Strategy): self._analyzer = runtime.analyzer def apply(self): - return self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(), - state=None, intention=st.REVERT) + return self._runtime.reset_atoms( + self._analyzer.iterate_nodes(co.ATOMS), + state=None, intention=st.REVERT) class Revert(Strategy): @@ -93,7 +95,7 @@ class Revert(Strategy): self._atom = atom def apply(self): - tweaked = self._runtime.reset_nodes([self._atom], state=None, + tweaked = self._runtime.reset_atoms([self._atom], state=None, intention=st.REVERT) tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None, intention=st.REVERT)) @@ -126,26 +128,26 @@ class Completer(object): self._retry_action.complete_reversion(retry, result) def resume(self): - """Resumes nodes in the contained graph. + """Resumes atoms in the contained graph. - This is done to allow any previously completed or failed nodes to - be analyzed, there results processed and any potential nodes affected + This is done to allow any previously completed or failed atoms to + be analyzed, there results processed and any potential atoms affected to be adjusted as needed. - This should return a set of nodes which should be the initial set of - nodes that were previously not finished (due to a RUNNING or REVERTING + This should return a set of atoms which should be the initial set of + atoms that were previously not finished (due to a RUNNING or REVERTING attempt not previously finishing). """ - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) == st.FAILURE: - self._process_atom_failure(node, self._storage.get(node.name)) + for atom in self._analyzer.iterate_nodes(co.ATOMS): + if self._analyzer.get_state(atom) == st.FAILURE: + self._process_atom_failure(atom, self._storage.get(atom.name)) for retry in self._analyzer.iterate_retries(st.RETRYING): self._runtime.retry_subflow(retry) - unfinished_nodes = set() - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING): - unfinished_nodes.add(node) - return unfinished_nodes + unfinished_atoms = set() + for atom in self._analyzer.iterate_nodes(co.ATOMS): + if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING): + unfinished_atoms.add(atom) + return unfinished_atoms def complete(self, node, event, result): """Performs post-execution completion of a node. @@ -167,7 +169,7 @@ class Completer(object): def _determine_resolution(self, atom, failure): """Determines which resolution strategy to activate/apply.""" - retry = self._analyzer.find_atom_retry(atom) + retry = self._analyzer.find_retry(atom) if retry is not None: # Ask retry controller what to do in case of failure. strategy = self._retry_action.on_failure(retry, atom, failure) |