summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-01 08:47:42 +0000
committerGerrit Code Review <review@openstack.org>2016-02-01 08:47:42 +0000
commit6a48709d76aa8523b1b0d99e6d4b6e267d7e5883 (patch)
treebdca51385a7a81b5f51d47f515f655df5502d689
parentf45a76941e1d681e1cdae2fb6b97a20e374595a9 (diff)
parent3243bd2d9b97dceb530744631d96fb3d47fdc1d8 (diff)
downloadtaskflow-6a48709d76aa8523b1b0d99e6d4b6e267d7e5883.tar.gz
Merge "Use helper function for post-atom-completion work"
-rw-r--r--taskflow/engines/action_engine/builder.py67
-rw-r--r--taskflow/engines/action_engine/completer.py22
2 files changed, 51 insertions, 38 deletions
diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py
index 7ce9b30..5b7c934 100644
--- a/taskflow/engines/action_engine/builder.py
+++ b/taskflow/engines/action_engine/builder.py
@@ -109,6 +109,8 @@ class MachineBuilder(object):
# Cache some local functions/methods...
do_complete = self._completer.complete
+ do_complete_failure = self._completer.complete_failure
+ get_atom_intention = self._storage.get_atom_intention
def do_schedule(next_nodes):
return self._scheduler.schedule(
@@ -180,6 +182,36 @@ class MachineBuilder(object):
memory.next_up.intersection_update(not_done)
return WAIT
+ def complete_an_atom(fut):
+ # This completes a single atom saving its result in
+ # storage and preparing whatever predecessors or successors will
+ # now be ready to execute (or revert or retry...); it also
+ # handles failures that occur during this process safely...
+ atom = fut.atom
+ try:
+ outcome, result = fut.result()
+ do_complete(atom, outcome, result)
+ if isinstance(result, failure.Failure):
+ retain = do_complete_failure(atom, outcome, result)
+ if retain:
+ memory.failures.append(result)
+ else:
+ # NOTE(harlowja): avoid making any intention request
+ # to storage unless we are sure we are in DEBUG
+ # enabled logging (otherwise we will call this all
+ # the time even when DEBUG is not enabled, which
+ # would suck...)
+ if LOG.isEnabledFor(logging.DEBUG):
+ intention = get_atom_intention(atom.name)
+ LOG.debug("Discarding failure '%s' (in response"
+ " to outcome '%s') under completion"
+ " units request during completion of"
+ " atom '%s' (intention is to %s)",
+ result, outcome, atom, intention)
+ except Exception:
+ memory.failures.append(failure.Failure())
+ LOG.exception("Engine '%s' atom post-completion failed", atom)
+
def wait(old_state, new_state, event):
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
@@ -192,40 +224,19 @@ class MachineBuilder(object):
def analyze(old_state, new_state, event):
# This reaction function is responsible for analyzing all nodes
- # that have finished executing and completing them and figuring
+ # that have finished executing/reverting and figuring
# out what nodes are now ready to be ran (and then triggering those
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
next_up = set()
while memory.done:
fut = memory.done.pop()
- atom = fut.atom
- try:
- outcome, result = fut.result()
- retain = do_complete(atom, outcome, result)
- if isinstance(result, failure.Failure):
- if retain:
- memory.failures.append(result)
- else:
- # NOTE(harlowja): avoid making any
- # intention request to storage unless we are
- # sure we are in DEBUG enabled logging (otherwise
- # we will call this all the time even when DEBUG
- # is not enabled, which would suck...)
- if LOG.isEnabledFor(logging.DEBUG):
- intention = self._storage.get_atom_intention(
- atom.name)
- LOG.debug("Discarding failure '%s' (in"
- " response to outcome '%s') under"
- " completion units request during"
- " completion of atom '%s' (intention"
- " is to %s)", result, outcome,
- atom, intention)
- except Exception:
- memory.failures.append(failure.Failure())
- LOG.exception("Engine '%s' atom post-completion"
- " failed", atom)
- else:
+ # Force it to be completed so that we can ensure that
+ # before we iterate over any successors or predecessors
+ # that we know it has been completed and saved and so on...
+ complete_an_atom(fut)
+ if not memory.failures:
+ atom = fut.atom
try:
more_work = set(iter_next_atoms(atom=atom))
except Exception:
diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py
index 2c964ee..5cfeed8 100644
--- a/taskflow/engines/action_engine/completer.py
+++ b/taskflow/engines/action_engine/completer.py
@@ -26,7 +26,6 @@ from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
from taskflow import states as st
-from taskflow.types import failure
LOG = logging.getLogger(__name__)
@@ -144,24 +143,27 @@ class Completer(object):
" state %s", atom, atom_state)
return unfinished_atoms
- def complete(self, node, outcome, result):
- """Performs post-execution completion of a node.
+ def complete_failure(self, node, outcome, failure):
+ """Performs post-execution completion of a nodes failure.
Returns whether the result should be saved into an accumulator of
failures or whether this should not be done.
"""
+ if outcome == ex.EXECUTED:
+ self._process_atom_failure(node, failure)
+ # We resolved something, carry on...
+ return False
+ else:
+ # Reverting failed, always retain the failure...
+ return True
+
+ def complete(self, node, outcome, result):
+ """Performs post-execution completion of a node result."""
handler = self._runtime.fetch_action(node)
if outcome == ex.EXECUTED:
handler.complete_execution(node, result)
else:
handler.complete_reversion(node, result)
- if isinstance(result, failure.Failure):
- if outcome == ex.EXECUTED:
- self._process_atom_failure(node, result)
- else:
- # Reverting failed, always retain the failure...
- return True
- return False
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""