summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/completer.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine/completer.py')
-rw-r--r--taskflow/engines/action_engine/completer.py38
1 files changed, 20 insertions, 18 deletions
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py
index 0ab727a..e3ab54d 100644
--- a/taskflow/engines/action_engine/completer.py
+++ b/taskflow/engines/action_engine/completer.py
@@ -20,6 +20,7 @@ import weakref
from oslo_utils import reflection
import six
+from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
@@ -62,7 +63,7 @@ class RevertAndRetry(Strategy):
self._retry = retry
def apply(self):
- tweaked = self._runtime.reset_nodes([self._retry], state=None,
+ tweaked = self._runtime.reset_atoms([self._retry], state=None,
intention=st.RETRY)
tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None,
intention=st.REVERT))
@@ -79,8 +80,9 @@ class RevertAll(Strategy):
self._analyzer = runtime.analyzer
def apply(self):
- return self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(),
- state=None, intention=st.REVERT)
+ return self._runtime.reset_atoms(
+ self._analyzer.iterate_nodes(co.ATOMS),
+ state=None, intention=st.REVERT)
class Revert(Strategy):
@@ -93,7 +95,7 @@ class Revert(Strategy):
self._atom = atom
def apply(self):
- tweaked = self._runtime.reset_nodes([self._atom], state=None,
+ tweaked = self._runtime.reset_atoms([self._atom], state=None,
intention=st.REVERT)
tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None,
intention=st.REVERT))
@@ -126,26 +128,26 @@ class Completer(object):
self._retry_action.complete_reversion(retry, result)
def resume(self):
- """Resumes nodes in the contained graph.
+ """Resumes atoms in the contained graph.
- This is done to allow any previously completed or failed nodes to
- be analyzed, there results processed and any potential nodes affected
+ This is done to allow any previously completed or failed atoms to
+ be analyzed, there results processed and any potential atoms affected
to be adjusted as needed.
- This should return a set of nodes which should be the initial set of
- nodes that were previously not finished (due to a RUNNING or REVERTING
+ This should return a set of atoms which should be the initial set of
+ atoms that were previously not finished (due to a RUNNING or REVERTING
attempt not previously finishing).
"""
- for node in self._analyzer.iterate_all_nodes():
- if self._analyzer.get_state(node) == st.FAILURE:
- self._process_atom_failure(node, self._storage.get(node.name))
+ for atom in self._analyzer.iterate_nodes(co.ATOMS):
+ if self._analyzer.get_state(atom) == st.FAILURE:
+ self._process_atom_failure(atom, self._storage.get(atom.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
self._runtime.retry_subflow(retry)
- unfinished_nodes = set()
- for node in self._analyzer.iterate_all_nodes():
- if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING):
- unfinished_nodes.add(node)
- return unfinished_nodes
+ unfinished_atoms = set()
+ for atom in self._analyzer.iterate_nodes(co.ATOMS):
+ if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING):
+ unfinished_atoms.add(atom)
+ return unfinished_atoms
def complete(self, node, event, result):
"""Performs post-execution completion of a node.
@@ -167,7 +169,7 @@ class Completer(object):
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""
- retry = self._analyzer.find_atom_retry(atom)
+ retry = self._analyzer.find_retry(atom)
if retry is not None:
# Ask retry controller what to do in case of failure.
strategy = self._retry_action.on_failure(retry, atom, failure)