summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/runtime.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r--taskflow/engines/action_engine/runtime.py78
1 files changed, 35 insertions, 43 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index d97ba96..6780e93 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -22,12 +22,13 @@ from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
+from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
-from taskflow import flow
+from taskflow import exceptions as exc
+from taskflow.flow import LINK_DECIDER
from taskflow import states as st
-from taskflow import task
from taskflow.utils import misc
@@ -47,7 +48,6 @@ class Runtime(object):
self._storage = storage
self._compilation = compilation
self._atom_cache = {}
- self._atoms_by_kind = {}
def compile(self):
"""Compiles & caches frequently used execution helper objects.
@@ -59,47 +59,47 @@ class Runtime(object):
specific scheduler and so-on).
"""
change_state_handlers = {
- 'task': functools.partial(self.task_action.change_state,
- progress=0.0),
- 'retry': self.retry_action.change_state,
+ com.TASK: functools.partial(self.task_action.change_state,
+ progress=0.0),
+ com.RETRY: self.retry_action.change_state,
}
schedulers = {
- 'retry': self.retry_scheduler,
- 'task': self.task_scheduler,
+ com.RETRY: self.retry_scheduler,
+ com.TASK: self.task_scheduler,
}
- execution_graph = self._compilation.execution_graph
- all_retry_atoms = []
- all_task_atoms = []
- for atom in self.analyzer.iterate_all_nodes():
- metadata = {}
- walker = sc.ScopeWalker(self.compilation, atom, names_only=True)
- if isinstance(atom, task.BaseTask):
- check_transition_handler = st.check_task_transition
- change_state_handler = change_state_handlers['task']
- scheduler = schedulers['task']
- all_task_atoms.append(atom)
+ check_transition_handlers = {
+ com.TASK: st.check_task_transition,
+ com.RETRY: st.check_retry_transition,
+ }
+ graph = self._compilation.execution_graph
+ for node, node_data in graph.nodes_iter(data=True):
+ node_kind = node_data['kind']
+ if node_kind == com.FLOW:
+ continue
+ elif node_kind in com.ATOMS:
+ check_transition_handler = check_transition_handlers[node_kind]
+ change_state_handler = change_state_handlers[node_kind]
+ scheduler = schedulers[node_kind]
else:
- check_transition_handler = st.check_retry_transition
- change_state_handler = change_state_handlers['retry']
- scheduler = schedulers['retry']
- all_retry_atoms.append(atom)
+ raise exc.CompilationFailure("Unknown node kind '%s'"
+ " encountered" % node_kind)
+ metadata = {}
+ walker = sc.ScopeWalker(self.compilation, node, names_only=True)
edge_deciders = {}
- for previous_atom in execution_graph.predecessors(atom):
+ for prev_node in graph.predecessors_iter(node):
# If there is any link function that says if this connection
# is able to run (or should not) ensure we retain it and use
# it later as needed.
- u_v_data = execution_graph.adj[previous_atom][atom]
- u_v_decider = u_v_data.get(flow.LINK_DECIDER)
+ u_v_data = graph.adj[prev_node][node]
+ u_v_decider = u_v_data.get(LINK_DECIDER)
if u_v_decider is not None:
- edge_deciders[previous_atom.name] = u_v_decider
+ edge_deciders[prev_node.name] = u_v_decider
metadata['scope_walker'] = walker
metadata['check_transition_handler'] = check_transition_handler
metadata['change_state_handler'] = change_state_handler
metadata['scheduler'] = scheduler
metadata['edge_deciders'] = edge_deciders
- self._atom_cache[atom.name] = metadata
- self._atoms_by_kind['retry'] = all_retry_atoms
- self._atoms_by_kind['task'] = all_task_atoms
+ self._atom_cache[node.name] = metadata
@property
def compilation(self):
@@ -162,15 +162,6 @@ class Runtime(object):
metadata = self._atom_cache[atom.name]
return metadata['edge_deciders']
- def fetch_atoms_by_kind(self, kind):
- """Fetches all the atoms of a given kind.
-
- NOTE(harlowja): Currently only ``task`` or ``retry`` are valid
- kinds of atoms (requesting other kinds will just
- return empty lists).
- """
- return self._atoms_by_kind.get(kind, [])
-
def fetch_scheduler(self, atom):
"""Fetches the cached specific scheduler for the given atom."""
# This does not check if the name exists (since this is only used
@@ -197,7 +188,7 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
- def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE):
+ def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
for atom in atoms:
@@ -213,7 +204,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
- return self.reset_nodes(self.analyzer.iterate_all_nodes(),
+ return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
@@ -221,8 +212,9 @@ class Runtime(object):
The subgraph is contained of all of the atoms successors.
"""
- return self.reset_nodes(self.analyzer.iterate_subgraph(atom),
- state=state, intention=intention)
+ return self.reset_atoms(
+ self.analyzer.iterate_connected_atoms(atom),
+ state=state, intention=intention)
def retry_subflow(self, retry):
"""Prepares a retrys + its subgraph for execution.