diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-09-04 13:14:25 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2015-10-01 22:38:30 -0700 |
commit | 79d25e69e8300db5debdfd717ffd80f91c246c10 (patch) | |
tree | 134e9764a021dc190a1b158fb27f222d9304908e /taskflow/engines/action_engine/runtime.py | |
parent | ba4704cd18ab6d799a2de59bdf0feab9b5430a30 (diff) | |
download | taskflow-1.22.0.tar.gz |
Simplify flow action engine compilation1.22.0
Instead of the added complexity of discarding flow nodes
we can simplify the compilation process by just retaining
them and jumping over them in further iteration and graph
and tree runtime usage.
This change moves toward a model that does just this, which
makes it also easier to in the future use the newly added
flow graph nodes to do meaningful things (like use them as
a point to change which flow_detail is used).
Change-Id: Icb1695f4b995a0392f940837514774768f222db4
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r-- | taskflow/engines/action_engine/runtime.py | 78 |
1 files changed, 35 insertions, 43 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index d97ba96..6780e93 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -22,12 +22,13 @@ from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an from taskflow.engines.action_engine import builder as bu +from taskflow.engines.action_engine import compiler as com from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc -from taskflow import flow +from taskflow import exceptions as exc +from taskflow.flow import LINK_DECIDER from taskflow import states as st -from taskflow import task from taskflow.utils import misc @@ -47,7 +48,6 @@ class Runtime(object): self._storage = storage self._compilation = compilation self._atom_cache = {} - self._atoms_by_kind = {} def compile(self): """Compiles & caches frequently used execution helper objects. @@ -59,47 +59,47 @@ class Runtime(object): specific scheduler and so-on). """ change_state_handlers = { - 'task': functools.partial(self.task_action.change_state, - progress=0.0), - 'retry': self.retry_action.change_state, + com.TASK: functools.partial(self.task_action.change_state, + progress=0.0), + com.RETRY: self.retry_action.change_state, } schedulers = { - 'retry': self.retry_scheduler, - 'task': self.task_scheduler, + com.RETRY: self.retry_scheduler, + com.TASK: self.task_scheduler, } - execution_graph = self._compilation.execution_graph - all_retry_atoms = [] - all_task_atoms = [] - for atom in self.analyzer.iterate_all_nodes(): - metadata = {} - walker = sc.ScopeWalker(self.compilation, atom, names_only=True) - if isinstance(atom, task.BaseTask): - check_transition_handler = st.check_task_transition - change_state_handler = change_state_handlers['task'] - scheduler = schedulers['task'] - all_task_atoms.append(atom) + check_transition_handlers = { + com.TASK: st.check_task_transition, + com.RETRY: st.check_retry_transition, + } + graph = self._compilation.execution_graph + for node, node_data in graph.nodes_iter(data=True): + node_kind = node_data['kind'] + if node_kind == com.FLOW: + continue + elif node_kind in com.ATOMS: + check_transition_handler = check_transition_handlers[node_kind] + change_state_handler = change_state_handlers[node_kind] + scheduler = schedulers[node_kind] else: - check_transition_handler = st.check_retry_transition - change_state_handler = change_state_handlers['retry'] - scheduler = schedulers['retry'] - all_retry_atoms.append(atom) + raise exc.CompilationFailure("Unknown node kind '%s'" + " encountered" % node_kind) + metadata = {} + walker = sc.ScopeWalker(self.compilation, node, names_only=True) edge_deciders = {} - for previous_atom in execution_graph.predecessors(atom): + for prev_node in graph.predecessors_iter(node): # If there is any link function that says if this connection # is able to run (or should not) ensure we retain it and use # it later as needed. - u_v_data = execution_graph.adj[previous_atom][atom] - u_v_decider = u_v_data.get(flow.LINK_DECIDER) + u_v_data = graph.adj[prev_node][node] + u_v_decider = u_v_data.get(LINK_DECIDER) if u_v_decider is not None: - edge_deciders[previous_atom.name] = u_v_decider + edge_deciders[prev_node.name] = u_v_decider metadata['scope_walker'] = walker metadata['check_transition_handler'] = check_transition_handler metadata['change_state_handler'] = change_state_handler metadata['scheduler'] = scheduler metadata['edge_deciders'] = edge_deciders - self._atom_cache[atom.name] = metadata - self._atoms_by_kind['retry'] = all_retry_atoms - self._atoms_by_kind['task'] = all_task_atoms + self._atom_cache[node.name] = metadata @property def compilation(self): @@ -162,15 +162,6 @@ class Runtime(object): metadata = self._atom_cache[atom.name] return metadata['edge_deciders'] - def fetch_atoms_by_kind(self, kind): - """Fetches all the atoms of a given kind. - - NOTE(harlowja): Currently only ``task`` or ``retry`` are valid - kinds of atoms (requesting other kinds will just - return empty lists). - """ - return self._atoms_by_kind.get(kind, []) - def fetch_scheduler(self, atom): """Fetches the cached specific scheduler for the given atom.""" # This does not check if the name exists (since this is only used @@ -197,7 +188,7 @@ class Runtime(object): # Various helper methods used by the runtime components; not for public # consumption... - def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE): + def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE): """Resets all the provided atoms to the given state and intention.""" tweaked = [] for atom in atoms: @@ -213,7 +204,7 @@ class Runtime(object): def reset_all(self, state=st.PENDING, intention=st.EXECUTE): """Resets all atoms to the given state and intention.""" - return self.reset_nodes(self.analyzer.iterate_all_nodes(), + return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS), state=state, intention=intention) def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): @@ -221,8 +212,9 @@ class Runtime(object): The subgraph is contained of all of the atoms successors. """ - return self.reset_nodes(self.analyzer.iterate_subgraph(atom), - state=state, intention=intention) + return self.reset_atoms( + self.analyzer.iterate_connected_atoms(atom), + state=state, intention=intention) def retry_subflow(self, retry): """Prepares a retrys + its subgraph for execution. |