diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-09-04 13:14:25 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2015-10-01 22:38:30 -0700 |
commit | 79d25e69e8300db5debdfd717ffd80f91c246c10 (patch) | |
tree | 134e9764a021dc190a1b158fb27f222d9304908e /taskflow/engines/action_engine/completer.py | |
parent | ba4704cd18ab6d799a2de59bdf0feab9b5430a30 (diff) | |
download | taskflow-1.22.0.tar.gz |
Simplify flow action engine compilation1.22.0
Instead of the added complexity of discarding flow nodes
we can simplify the compilation process by just retaining
them and jumping over them in further iteration and graph
and tree runtime usage.
This change moves toward a model that does just this, which
makes it also easier to in the future use the newly added
flow graph nodes to do meaningful things (like use them as
a point to change which flow_detail is used).
Change-Id: Icb1695f4b995a0392f940837514774768f222db4
Diffstat (limited to 'taskflow/engines/action_engine/completer.py')
-rw-r--r-- | taskflow/engines/action_engine/completer.py | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 0ab727a..e3ab54d 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -20,6 +20,7 @@ import weakref from oslo_utils import reflection import six +from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom @@ -62,7 +63,7 @@ class RevertAndRetry(Strategy): self._retry = retry def apply(self): - tweaked = self._runtime.reset_nodes([self._retry], state=None, + tweaked = self._runtime.reset_atoms([self._retry], state=None, intention=st.RETRY) tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None, intention=st.REVERT)) @@ -79,8 +80,9 @@ class RevertAll(Strategy): self._analyzer = runtime.analyzer def apply(self): - return self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(), - state=None, intention=st.REVERT) + return self._runtime.reset_atoms( + self._analyzer.iterate_nodes(co.ATOMS), + state=None, intention=st.REVERT) class Revert(Strategy): @@ -93,7 +95,7 @@ class Revert(Strategy): self._atom = atom def apply(self): - tweaked = self._runtime.reset_nodes([self._atom], state=None, + tweaked = self._runtime.reset_atoms([self._atom], state=None, intention=st.REVERT) tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None, intention=st.REVERT)) @@ -126,26 +128,26 @@ class Completer(object): self._retry_action.complete_reversion(retry, result) def resume(self): - """Resumes nodes in the contained graph. + """Resumes atoms in the contained graph. - This is done to allow any previously completed or failed nodes to - be analyzed, there results processed and any potential nodes affected + This is done to allow any previously completed or failed atoms to + be analyzed, there results processed and any potential atoms affected to be adjusted as needed. - This should return a set of nodes which should be the initial set of - nodes that were previously not finished (due to a RUNNING or REVERTING + This should return a set of atoms which should be the initial set of + atoms that were previously not finished (due to a RUNNING or REVERTING attempt not previously finishing). """ - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) == st.FAILURE: - self._process_atom_failure(node, self._storage.get(node.name)) + for atom in self._analyzer.iterate_nodes(co.ATOMS): + if self._analyzer.get_state(atom) == st.FAILURE: + self._process_atom_failure(atom, self._storage.get(atom.name)) for retry in self._analyzer.iterate_retries(st.RETRYING): self._runtime.retry_subflow(retry) - unfinished_nodes = set() - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING): - unfinished_nodes.add(node) - return unfinished_nodes + unfinished_atoms = set() + for atom in self._analyzer.iterate_nodes(co.ATOMS): + if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING): + unfinished_atoms.add(atom) + return unfinished_atoms def complete(self, node, event, result): """Performs post-execution completion of a node. @@ -167,7 +169,7 @@ class Completer(object): def _determine_resolution(self, atom, failure): """Determines which resolution strategy to activate/apply.""" - retry = self._analyzer.find_atom_retry(atom) + retry = self._analyzer.find_retry(atom) if retry is not None: # Ask retry controller what to do in case of failure. strategy = self._retry_action.on_failure(retry, atom, failure) |