From 79d25e69e8300db5debdfd717ffd80f91c246c10 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 4 Sep 2015 13:14:25 -0700 Subject: Simplify flow action engine compilation Instead of the added complexity of discarding flow nodes we can simplify the compilation process by just retaining them and jumping over them in further iteration and graph and tree runtime usage. This change moves toward a model that does just this, which makes it also easier to in the future use the newly added flow graph nodes to do meaningful things (like use them as a point to change which flow_detail is used). Change-Id: Icb1695f4b995a0392f940837514774768f222db4 --- taskflow/engines/action_engine/completer.py | 38 +++++++++++++++-------------- 1 file changed, 20 insertions(+), 18 deletions(-) (limited to 'taskflow/engines/action_engine/completer.py') diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 0ab727a..e3ab54d 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -20,6 +20,7 @@ import weakref from oslo_utils import reflection import six +from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom @@ -62,7 +63,7 @@ class RevertAndRetry(Strategy): self._retry = retry def apply(self): - tweaked = self._runtime.reset_nodes([self._retry], state=None, + tweaked = self._runtime.reset_atoms([self._retry], state=None, intention=st.RETRY) tweaked.extend(self._runtime.reset_subgraph(self._retry, state=None, intention=st.REVERT)) @@ -79,8 +80,9 @@ class RevertAll(Strategy): self._analyzer = runtime.analyzer def apply(self): - return self._runtime.reset_nodes(self._analyzer.iterate_all_nodes(), - state=None, intention=st.REVERT) + return self._runtime.reset_atoms( + self._analyzer.iterate_nodes(co.ATOMS), + state=None, intention=st.REVERT) class Revert(Strategy): @@ -93,7 +95,7 @@ class Revert(Strategy): self._atom = atom def apply(self): - tweaked = self._runtime.reset_nodes([self._atom], state=None, + tweaked = self._runtime.reset_atoms([self._atom], state=None, intention=st.REVERT) tweaked.extend(self._runtime.reset_subgraph(self._atom, state=None, intention=st.REVERT)) @@ -126,26 +128,26 @@ class Completer(object): self._retry_action.complete_reversion(retry, result) def resume(self): - """Resumes nodes in the contained graph. + """Resumes atoms in the contained graph. - This is done to allow any previously completed or failed nodes to - be analyzed, there results processed and any potential nodes affected + This is done to allow any previously completed or failed atoms to + be analyzed, there results processed and any potential atoms affected to be adjusted as needed. - This should return a set of nodes which should be the initial set of - nodes that were previously not finished (due to a RUNNING or REVERTING + This should return a set of atoms which should be the initial set of + atoms that were previously not finished (due to a RUNNING or REVERTING attempt not previously finishing). """ - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) == st.FAILURE: - self._process_atom_failure(node, self._storage.get(node.name)) + for atom in self._analyzer.iterate_nodes(co.ATOMS): + if self._analyzer.get_state(atom) == st.FAILURE: + self._process_atom_failure(atom, self._storage.get(atom.name)) for retry in self._analyzer.iterate_retries(st.RETRYING): self._runtime.retry_subflow(retry) - unfinished_nodes = set() - for node in self._analyzer.iterate_all_nodes(): - if self._analyzer.get_state(node) in (st.RUNNING, st.REVERTING): - unfinished_nodes.add(node) - return unfinished_nodes + unfinished_atoms = set() + for atom in self._analyzer.iterate_nodes(co.ATOMS): + if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING): + unfinished_atoms.add(atom) + return unfinished_atoms def complete(self, node, event, result): """Performs post-execution completion of a node. @@ -167,7 +169,7 @@ class Completer(object): def _determine_resolution(self, atom, failure): """Determines which resolution strategy to activate/apply.""" - retry = self._analyzer.find_atom_retry(atom) + retry = self._analyzer.find_retry(atom) if retry is not None: # Ask retry controller what to do in case of failure. strategy = self._retry_action.on_failure(retry, atom, failure) -- cgit v1.2.1