diff options
Diffstat (limited to 'taskflow/engines/action_engine/builder.py')
-rw-r--r-- | taskflow/engines/action_engine/builder.py | 55 |
1 files changed, 28 insertions, 27 deletions
diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 034e64a..cdf3646 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -49,7 +49,7 @@ class MachineMemory(object): """State machine memory.""" def __init__(self): - self.next_nodes = set() + self.next_up = set() self.not_done = set() self.failures = [] self.done = set() @@ -115,24 +115,25 @@ class MachineBuilder(object): # Checks if the storage says the flow is still runnable... return self._storage.get_flow_state() == st.RUNNING - def iter_next_nodes(target_node=None, apply_deciders=True): - # Yields and filters and tweaks the next nodes to execute... - maybe_nodes = self._analyzer.get_next_nodes(node=target_node) - for node, late_decider in maybe_nodes: + def iter_next_atoms(atom=None, apply_deciders=True): + # Yields and filters and tweaks the next atoms to run... + maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom) + for atom, late_decider in maybe_atoms_it: if apply_deciders: proceed = late_decider.check_and_affect(self._runtime) if proceed: - yield node + yield atom else: - yield node + yield atom def resume(old_state, new_state, event): # This reaction function just updates the state machines memory # to include any nodes that need to be executed (from a previous # attempt, which may be empty if never ran before) and any nodes # that are now ready to be ran. - memory.next_nodes.update(self._completer.resume()) - memory.next_nodes.update(iter_next_nodes()) + memory.next_up.update( + iter_utils.unique_seen(self._completer.resume(), + iter_next_atoms())) return SCHEDULE def game_over(old_state, new_state, event): @@ -142,17 +143,17 @@ class MachineBuilder(object): # it is *always* called before the final state is entered. if memory.failures: return FAILED - leftover_nodes = iter_utils.count( + leftover_atoms = iter_utils.count( # Avoid activating the deciders, since at this point # the engine is finishing and there will be no more further # work done anyway... - iter_next_nodes(apply_deciders=False)) - if leftover_nodes: + iter_next_atoms(apply_deciders=False)) + if leftover_atoms: # Ok we didn't finish (either reverting or executing...) so # that means we must of been stopped at some point... LOG.blather("Suspension determined to have been reacted to" - " since (at least) %s nodes have been left in an" - " unfinished state", leftover_nodes) + " since (at least) %s atoms have been left in an" + " unfinished state", leftover_atoms) return SUSPENDED elif self._analyzer.is_success(): return SUCCESS @@ -165,13 +166,13 @@ class MachineBuilder(object): # if the user of this engine has requested the engine/storage # that holds this information to stop or suspend); handles failures # that occur during this process safely... - if is_runnable() and memory.next_nodes: - not_done, failures = do_schedule(memory.next_nodes) + if is_runnable() and memory.next_up: + not_done, failures = do_schedule(memory.next_up) if not_done: memory.not_done.update(not_done) if failures: memory.failures.extend(failures) - memory.next_nodes.intersection_update(not_done) + memory.next_up.intersection_update(not_done) return WAIT def wait(old_state, new_state, event): @@ -190,13 +191,13 @@ class MachineBuilder(object): # out what nodes are now ready to be ran (and then triggering those # nodes to be scheduled in the future); handles failures that # occur during this process safely... - next_nodes = set() + next_up = set() while memory.done: fut = memory.done.pop() - node = fut.atom + atom = fut.atom try: event, result = fut.result() - retain = do_complete(node, event, result) + retain = do_complete(atom, event, result) if isinstance(result, failure.Failure): if retain: memory.failures.append(result) @@ -208,24 +209,24 @@ class MachineBuilder(object): # is not enabled, which would suck...) if LOG.isEnabledFor(logging.DEBUG): intention = self._storage.get_atom_intention( - node.name) + atom.name) LOG.debug("Discarding failure '%s' (in" " response to event '%s') under" " completion units request during" - " completion of node '%s' (intention" + " completion of atom '%s' (intention" " is to %s)", result, event, - node, intention) + atom, intention) except Exception: memory.failures.append(failure.Failure()) else: try: - more_nodes = set(iter_next_nodes(target_node=node)) + more_work = set(iter_next_atoms(atom=atom)) except Exception: memory.failures.append(failure.Failure()) else: - next_nodes.update(more_nodes) - if is_runnable() and next_nodes and not memory.failures: - memory.next_nodes.update(next_nodes) + next_up.update(more_work) + if is_runnable() and next_up and not memory.failures: + memory.next_up.update(next_up) return SCHEDULE elif memory.not_done: return WAIT |