summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/completer.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-09-04 13:14:25 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-10-01 22:38:30 -0700
commit79d25e69e8300db5debdfd717ffd80f91c246c10 (patch)
tree134e9764a021dc190a1b158fb27f222d9304908e /taskflow/engines/action_engine/completer.py
parentba4704cd18ab6d799a2de59bdf0feab9b5430a30 (diff)
downloadtaskflow-1.22.0.tar.gz
Simplify flow action engine compilation1.22.0
Instead of the added complexity of discarding flow nodes we can simplify the compilation process by just retaining them and jumping over them in further iteration and graph and tree runtime usage. This change moves toward a model that does just this, which makes it also easier to in the future use the newly added flow graph nodes to do meaningful things (like use them as a point to change which flow_detail is used). Change-Id: Icb1695f4b995a0392f940837514774768f222db4
Diffstat (limited to 'taskflow/engines/action_engine/completer.py')
-rw-r--r--taskflow/engines/action_engine/completer.py38
1 files changed, 20 insertions, 18 deletions
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py
index 0ab727a..e3ab54d 100644
--- a/taskflow/engines/action_engine/completer.py
+++ b/taskflow/engines/action_engine/completer.py
@@ -20,6 +20,7 @@ import weakref
from oslo_utils import reflection
import six
+from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
@@ -62,7 +63,7 @@ class RevertAndRetry(Strategy):
self._retry = retry
def apply(self):
- tweaked = self._runtime.reset_nodes([self._retry], state=None,
+ tweaked = self._runtime.reset_atoms([self._retry], state=None,
intention=st.RETRY)
tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None,
intention=st.REVERT))
@@ -79,8 +80,9 @@ class RevertAll(Strategy):
self._analyzer = runtime.analyzer
def apply(self):
- return self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(),
- state=None, intention=st.REVERT)
+ return self._runtime.reset_atoms(
+ self._analyzer.iterate_nodes(co.ATOMS),
+ state=None, intention=st.REVERT)
class Revert(Strategy):
@@ -93,7 +95,7 @@ class Revert(Strategy):
self._atom = atom
def apply(self):
- tweaked = self._runtime.reset_nodes([self._atom], state=None,
+ tweaked = self._runtime.reset_atoms([self._atom], state=None,
intention=st.REVERT)
tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None,
intention=st.REVERT))
@@ -126,26 +128,26 @@ class Completer(object):
self._retry_action.complete_reversion(retry, result)
def resume(self):
- """Resumes nodes in the contained graph.
+ """Resumes atoms in the contained graph.
- This is done to allow any previously completed or failed nodes to
- be analyzed, there results processed and any potential nodes affected
+ This is done to allow any previously completed or failed atoms to
+ be analyzed, there results processed and any potential atoms affected
to be adjusted as needed.
- This should return a set of nodes which should be the initial set of
- nodes that were previously not finished (due to a RUNNING or REVERTING
+ This should return a set of atoms which should be the initial set of
+ atoms that were previously not finished (due to a RUNNING or REVERTING
attempt not previously finishing).
"""
- for node in self._analyzer.iterate_all_nodes():
- if self._analyzer.get_state(node) == st.FAILURE:
- self._process_atom_failure(node, self._storage.get(node.name))
+ for atom in self._analyzer.iterate_nodes(co.ATOMS):
+ if self._analyzer.get_state(atom) == st.FAILURE:
+ self._process_atom_failure(atom, self._storage.get(atom.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
self._runtime.retry_subflow(retry)
- unfinished_nodes = set()
- for node in self._analyzer.iterate_all_nodes():
- if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING):
- unfinished_nodes.add(node)
- return unfinished_nodes
+ unfinished_atoms = set()
+ for atom in self._analyzer.iterate_nodes(co.ATOMS):
+ if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING):
+ unfinished_atoms.add(atom)
+ return unfinished_atoms
def complete(self, node, event, result):
"""Performs post-execution completion of a node.
@@ -167,7 +169,7 @@ class Completer(object):
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""
- retry = self._analyzer.find_atom_retry(atom)
+ retry = self._analyzer.find_retry(atom)
if retry is not None:
# Ask retry controller what to do in case of failure.
strategy = self._retry_action.on_failure(retry, atom, failure)