diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-02-01 08:47:42 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-02-01 08:47:42 +0000 |
commit | 6a48709d76aa8523b1b0d99e6d4b6e267d7e5883 (patch) | |
tree | bdca51385a7a81b5f51d47f515f655df5502d689 | |
parent | f45a76941e1d681e1cdae2fb6b97a20e374595a9 (diff) | |
parent | 3243bd2d9b97dceb530744631d96fb3d47fdc1d8 (diff) | |
download | taskflow-6a48709d76aa8523b1b0d99e6d4b6e267d7e5883.tar.gz |
Merge "Use helper function for post-atom-completion work"
-rw-r--r-- | taskflow/engines/action_engine/builder.py | 67 | ||||
-rw-r--r-- | taskflow/engines/action_engine/completer.py | 22 |
2 files changed, 51 insertions, 38 deletions
diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 7ce9b30..5b7c934 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -109,6 +109,8 @@ class MachineBuilder(object): # Cache some local functions/methods... do_complete = self._completer.complete + do_complete_failure = self._completer.complete_failure + get_atom_intention = self._storage.get_atom_intention def do_schedule(next_nodes): return self._scheduler.schedule( @@ -180,6 +182,36 @@ class MachineBuilder(object): memory.next_up.intersection_update(not_done) return WAIT + def complete_an_atom(fut): + # This completes a single atom saving its result in + # storage and preparing whatever predecessors or successors will + # now be ready to execute (or revert or retry...); it also + # handles failures that occur during this process safely... + atom = fut.atom + try: + outcome, result = fut.result() + do_complete(atom, outcome, result) + if isinstance(result, failure.Failure): + retain = do_complete_failure(atom, outcome, result) + if retain: + memory.failures.append(result) + else: + # NOTE(harlowja): avoid making any intention request + # to storage unless we are sure we are in DEBUG + # enabled logging (otherwise we will call this all + # the time even when DEBUG is not enabled, which + # would suck...) + if LOG.isEnabledFor(logging.DEBUG): + intention = get_atom_intention(atom.name) + LOG.debug("Discarding failure '%s' (in response" + " to outcome '%s') under completion" + " units request during completion of" + " atom '%s' (intention is to %s)", + result, outcome, atom, intention) + except Exception: + memory.failures.append(failure.Failure()) + LOG.exception("Engine '%s' atom post-completion failed", atom) + def wait(old_state, new_state, event): # TODO(harlowja): maybe we should start doing 'yield from' this # call sometime in the future, or equivalent that will work in @@ -192,40 +224,19 @@ class MachineBuilder(object): def analyze(old_state, new_state, event): # This reaction function is responsible for analyzing all nodes - # that have finished executing and completing them and figuring + # that have finished executing/reverting and figuring # out what nodes are now ready to be ran (and then triggering those # nodes to be scheduled in the future); handles failures that # occur during this process safely... next_up = set() while memory.done: fut = memory.done.pop() - atom = fut.atom - try: - outcome, result = fut.result() - retain = do_complete(atom, outcome, result) - if isinstance(result, failure.Failure): - if retain: - memory.failures.append(result) - else: - # NOTE(harlowja): avoid making any - # intention request to storage unless we are - # sure we are in DEBUG enabled logging (otherwise - # we will call this all the time even when DEBUG - # is not enabled, which would suck...) - if LOG.isEnabledFor(logging.DEBUG): - intention = self._storage.get_atom_intention( - atom.name) - LOG.debug("Discarding failure '%s' (in" - " response to outcome '%s') under" - " completion units request during" - " completion of atom '%s' (intention" - " is to %s)", result, outcome, - atom, intention) - except Exception: - memory.failures.append(failure.Failure()) - LOG.exception("Engine '%s' atom post-completion" - " failed", atom) - else: + # Force it to be completed so that we can ensure that + # before we iterate over any successors or predecessors + # that we know it has been completed and saved and so on... + complete_an_atom(fut) + if not memory.failures: + atom = fut.atom try: more_work = set(iter_next_atoms(atom=atom)) except Exception: diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 2c964ee..5cfeed8 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -26,7 +26,6 @@ from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom from taskflow import states as st -from taskflow.types import failure LOG = logging.getLogger(__name__) @@ -144,24 +143,27 @@ class Completer(object): " state %s", atom, atom_state) return unfinished_atoms - def complete(self, node, outcome, result): - """Performs post-execution completion of a node. + def complete_failure(self, node, outcome, failure): + """Performs post-execution completion of a nodes failure. Returns whether the result should be saved into an accumulator of failures or whether this should not be done. """ + if outcome == ex.EXECUTED: + self._process_atom_failure(node, failure) + # We resolved something, carry on... + return False + else: + # Reverting failed, always retain the failure... + return True + + def complete(self, node, outcome, result): + """Performs post-execution completion of a node result.""" handler = self._runtime.fetch_action(node) if outcome == ex.EXECUTED: handler.complete_execution(node, result) else: handler.complete_reversion(node, result) - if isinstance(result, failure.Failure): - if outcome == ex.EXECUTED: - self._process_atom_failure(node, result) - else: - # Reverting failed, always retain the failure... - return True - return False def _determine_resolution(self, atom, failure): """Determines which resolution strategy to activate/apply.""" |