summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/runtime.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-09-04 13:14:25 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-10-01 22:38:30 -0700
commit79d25e69e8300db5debdfd717ffd80f91c246c10 (patch)
tree134e9764a021dc190a1b158fb27f222d9304908e /taskflow/engines/action_engine/runtime.py
parentba4704cd18ab6d799a2de59bdf0feab9b5430a30 (diff)
downloadtaskflow-1.22.0.tar.gz
Simplify flow action engine compilation1.22.0
Instead of the added complexity of discarding flow nodes we can simplify the compilation process by just retaining them and jumping over them in further iteration and graph and tree runtime usage. This change moves toward a model that does just this, which makes it also easier to in the future use the newly added flow graph nodes to do meaningful things (like use them as a point to change which flow_detail is used). Change-Id: Icb1695f4b995a0392f940837514774768f222db4
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r--taskflow/engines/action_engine/runtime.py78
1 files changed, 35 insertions, 43 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index d97ba96..6780e93 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -22,12 +22,13 @@ from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
+from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
-from taskflow import flow
+from taskflow import exceptions as exc
+from taskflow.flow import LINK_DECIDER
from taskflow import states as st
-from taskflow import task
from taskflow.utils import misc
@@ -47,7 +48,6 @@ class Runtime(object):
self._storage = storage
self._compilation = compilation
self._atom_cache = {}
- self._atoms_by_kind = {}
def compile(self):
"""Compiles & caches frequently used execution helper objects.
@@ -59,47 +59,47 @@ class Runtime(object):
specific scheduler and so-on).
"""
change_state_handlers = {
- 'task': functools.partial(self.task_action.change_state,
- progress=0.0),
- 'retry': self.retry_action.change_state,
+ com.TASK: functools.partial(self.task_action.change_state,
+ progress=0.0),
+ com.RETRY: self.retry_action.change_state,
}
schedulers = {
- 'retry': self.retry_scheduler,
- 'task': self.task_scheduler,
+ com.RETRY: self.retry_scheduler,
+ com.TASK: self.task_scheduler,
}
- execution_graph = self._compilation.execution_graph
- all_retry_atoms = []
- all_task_atoms = []
- for atom in self.analyzer.iterate_all_nodes():
- metadata = {}
- walker = sc.ScopeWalker(self.compilation, atom, names_only=True)
- if isinstance(atom, task.BaseTask):
- check_transition_handler = st.check_task_transition
- change_state_handler = change_state_handlers['task']
- scheduler = schedulers['task']
- all_task_atoms.append(atom)
+ check_transition_handlers = {
+ com.TASK: st.check_task_transition,
+ com.RETRY: st.check_retry_transition,
+ }
+ graph = self._compilation.execution_graph
+ for node, node_data in graph.nodes_iter(data=True):
+ node_kind = node_data['kind']
+ if node_kind == com.FLOW:
+ continue
+ elif node_kind in com.ATOMS:
+ check_transition_handler = check_transition_handlers[node_kind]
+ change_state_handler = change_state_handlers[node_kind]
+ scheduler = schedulers[node_kind]
else:
- check_transition_handler = st.check_retry_transition
- change_state_handler = change_state_handlers['retry']
- scheduler = schedulers['retry']
- all_retry_atoms.append(atom)
+ raise exc.CompilationFailure("Unknown node kind '%s'"
+ " encountered" % node_kind)
+ metadata = {}
+ walker = sc.ScopeWalker(self.compilation, node, names_only=True)
edge_deciders = {}
- for previous_atom in execution_graph.predecessors(atom):
+ for prev_node in graph.predecessors_iter(node):
# If there is any link function that says if this connection
# is able to run (or should not) ensure we retain it and use
# it later as needed.
- u_v_data = execution_graph.adj[previous_atom][atom]
- u_v_decider = u_v_data.get(flow.LINK_DECIDER)
+ u_v_data = graph.adj[prev_node][node]
+ u_v_decider = u_v_data.get(LINK_DECIDER)
if u_v_decider is not None:
- edge_deciders[previous_atom.name] = u_v_decider
+ edge_deciders[prev_node.name] = u_v_decider
metadata['scope_walker'] = walker
metadata['check_transition_handler'] = check_transition_handler
metadata['change_state_handler'] = change_state_handler
metadata['scheduler'] = scheduler
metadata['edge_deciders'] = edge_deciders
- self._atom_cache[atom.name] = metadata
- self._atoms_by_kind['retry'] = all_retry_atoms
- self._atoms_by_kind['task'] = all_task_atoms
+ self._atom_cache[node.name] = metadata
@property
def compilation(self):
@@ -162,15 +162,6 @@ class Runtime(object):
metadata = self._atom_cache[atom.name]
return metadata['edge_deciders']
- def fetch_atoms_by_kind(self, kind):
- """Fetches all the atoms of a given kind.
-
- NOTE(harlowja): Currently only ``task`` or ``retry`` are valid
- kinds of atoms (requesting other kinds will just
- return empty lists).
- """
- return self._atoms_by_kind.get(kind, [])
-
def fetch_scheduler(self, atom):
"""Fetches the cached specific scheduler for the given atom."""
# This does not check if the name exists (since this is only used
@@ -197,7 +188,7 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
- def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE):
+ def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
for atom in atoms:
@@ -213,7 +204,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
- return self.reset_nodes(self.analyzer.iterate_all_nodes(),
+ return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
@@ -221,8 +212,9 @@ class Runtime(object):
The subgraph is contained of all of the atoms successors.
"""
- return self.reset_nodes(self.analyzer.iterate_subgraph(atom),
- state=state, intention=intention)
+ return self.reset_atoms(
+ self.analyzer.iterate_connected_atoms(atom),
+ state=state, intention=intention)
def retry_subflow(self, retry):
"""Prepares a retrys + its subgraph for execution.