summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/builder.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine/builder.py')
-rw-r--r--taskflow/engines/action_engine/builder.py55
1 files changed, 28 insertions, 27 deletions
diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py
index 034e64a..cdf3646 100644
--- a/taskflow/engines/action_engine/builder.py
+++ b/taskflow/engines/action_engine/builder.py
@@ -49,7 +49,7 @@ class MachineMemory(object):
"""State machine memory."""
def __init__(self):
- self.next_nodes = set()
+ self.next_up = set()
self.not_done = set()
self.failures = []
self.done = set()
@@ -115,24 +115,25 @@ class MachineBuilder(object):
# Checks if the storage says the flow is still runnable...
return self._storage.get_flow_state() == st.RUNNING
- def iter_next_nodes(target_node=None, apply_deciders=True):
- # Yields and filters and tweaks the next nodes to execute...
- maybe_nodes = self._analyzer.get_next_nodes(node=target_node)
- for node, late_decider in maybe_nodes:
+ def iter_next_atoms(atom=None, apply_deciders=True):
+ # Yields and filters and tweaks the next atoms to run...
+ maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom)
+ for atom, late_decider in maybe_atoms_it:
if apply_deciders:
proceed = late_decider.check_and_affect(self._runtime)
if proceed:
- yield node
+ yield atom
else:
- yield node
+ yield atom
def resume(old_state, new_state, event):
# This reaction function just updates the state machines memory
# to include any nodes that need to be executed (from a previous
# attempt, which may be empty if never ran before) and any nodes
# that are now ready to be ran.
- memory.next_nodes.update(self._completer.resume())
- memory.next_nodes.update(iter_next_nodes())
+ memory.next_up.update(
+ iter_utils.unique_seen(self._completer.resume(),
+ iter_next_atoms()))
return SCHEDULE
def game_over(old_state, new_state, event):
@@ -142,17 +143,17 @@ class MachineBuilder(object):
# it is *always* called before the final state is entered.
if memory.failures:
return FAILED
- leftover_nodes = iter_utils.count(
+ leftover_atoms = iter_utils.count(
# Avoid activating the deciders, since at this point
# the engine is finishing and there will be no more further
# work done anyway...
- iter_next_nodes(apply_deciders=False))
- if leftover_nodes:
+ iter_next_atoms(apply_deciders=False))
+ if leftover_atoms:
# Ok we didn't finish (either reverting or executing...) so
# that means we must of been stopped at some point...
LOG.blather("Suspension determined to have been reacted to"
- " since (at least) %s nodes have been left in an"
- " unfinished state", leftover_nodes)
+ " since (at least) %s atoms have been left in an"
+ " unfinished state", leftover_atoms)
return SUSPENDED
elif self._analyzer.is_success():
return SUCCESS
@@ -165,13 +166,13 @@ class MachineBuilder(object):
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
- if is_runnable() and memory.next_nodes:
- not_done, failures = do_schedule(memory.next_nodes)
+ if is_runnable() and memory.next_up:
+ not_done, failures = do_schedule(memory.next_up)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
- memory.next_nodes.intersection_update(not_done)
+ memory.next_up.intersection_update(not_done)
return WAIT
def wait(old_state, new_state, event):
@@ -190,13 +191,13 @@ class MachineBuilder(object):
# out what nodes are now ready to be ran (and then triggering those
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
- next_nodes = set()
+ next_up = set()
while memory.done:
fut = memory.done.pop()
- node = fut.atom
+ atom = fut.atom
try:
event, result = fut.result()
- retain = do_complete(node, event, result)
+ retain = do_complete(atom, event, result)
if isinstance(result, failure.Failure):
if retain:
memory.failures.append(result)
@@ -208,24 +209,24 @@ class MachineBuilder(object):
# is not enabled, which would suck...)
if LOG.isEnabledFor(logging.DEBUG):
intention = self._storage.get_atom_intention(
- node.name)
+ atom.name)
LOG.debug("Discarding failure '%s' (in"
" response to event '%s') under"
" completion units request during"
- " completion of node '%s' (intention"
+ " completion of atom '%s' (intention"
" is to %s)", result, event,
- node, intention)
+ atom, intention)
except Exception:
memory.failures.append(failure.Failure())
else:
try:
- more_nodes = set(iter_next_nodes(target_node=node))
+ more_work = set(iter_next_atoms(atom=atom))
except Exception:
memory.failures.append(failure.Failure())
else:
- next_nodes.update(more_nodes)
- if is_runnable() and next_nodes and not memory.failures:
- memory.next_nodes.update(next_nodes)
+ next_up.update(more_work)
+ if is_runnable() and next_up and not memory.failures:
+ memory.next_up.update(next_up)
return SCHEDULE
elif memory.not_done:
return WAIT