diff options
Diffstat (limited to 'taskflow/engines/action_engine/runtime.py')
-rw-r--r-- | taskflow/engines/action_engine/runtime.py | 78 |
1 files changed, 35 insertions, 43 deletions
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index d97ba96..6780e93 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -22,12 +22,13 @@ from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta from taskflow.engines.action_engine import analyzer as an from taskflow.engines.action_engine import builder as bu +from taskflow.engines.action_engine import compiler as com from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc -from taskflow import flow +from taskflow import exceptions as exc +from taskflow.flow import LINK_DECIDER from taskflow import states as st -from taskflow import task from taskflow.utils import misc @@ -47,7 +48,6 @@ class Runtime(object): self._storage = storage self._compilation = compilation self._atom_cache = {} - self._atoms_by_kind = {} def compile(self): """Compiles & caches frequently used execution helper objects. @@ -59,47 +59,47 @@ class Runtime(object): specific scheduler and so-on). """ change_state_handlers = { - 'task': functools.partial(self.task_action.change_state, - progress=0.0), - 'retry': self.retry_action.change_state, + com.TASK: functools.partial(self.task_action.change_state, + progress=0.0), + com.RETRY: self.retry_action.change_state, } schedulers = { - 'retry': self.retry_scheduler, - 'task': self.task_scheduler, + com.RETRY: self.retry_scheduler, + com.TASK: self.task_scheduler, } - execution_graph = self._compilation.execution_graph - all_retry_atoms = [] - all_task_atoms = [] - for atom in self.analyzer.iterate_all_nodes(): - metadata = {} - walker = sc.ScopeWalker(self.compilation, atom, names_only=True) - if isinstance(atom, task.BaseTask): - check_transition_handler = st.check_task_transition - change_state_handler = change_state_handlers['task'] - scheduler = schedulers['task'] - all_task_atoms.append(atom) + check_transition_handlers = { + com.TASK: st.check_task_transition, + com.RETRY: st.check_retry_transition, + } + graph = self._compilation.execution_graph + for node, node_data in graph.nodes_iter(data=True): + node_kind = node_data['kind'] + if node_kind == com.FLOW: + continue + elif node_kind in com.ATOMS: + check_transition_handler = check_transition_handlers[node_kind] + change_state_handler = change_state_handlers[node_kind] + scheduler = schedulers[node_kind] else: - check_transition_handler = st.check_retry_transition - change_state_handler = change_state_handlers['retry'] - scheduler = schedulers['retry'] - all_retry_atoms.append(atom) + raise exc.CompilationFailure("Unknown node kind '%s'" + " encountered" % node_kind) + metadata = {} + walker = sc.ScopeWalker(self.compilation, node, names_only=True) edge_deciders = {} - for previous_atom in execution_graph.predecessors(atom): + for prev_node in graph.predecessors_iter(node): # If there is any link function that says if this connection # is able to run (or should not) ensure we retain it and use # it later as needed. - u_v_data = execution_graph.adj[previous_atom][atom] - u_v_decider = u_v_data.get(flow.LINK_DECIDER) + u_v_data = graph.adj[prev_node][node] + u_v_decider = u_v_data.get(LINK_DECIDER) if u_v_decider is not None: - edge_deciders[previous_atom.name] = u_v_decider + edge_deciders[prev_node.name] = u_v_decider metadata['scope_walker'] = walker metadata['check_transition_handler'] = check_transition_handler metadata['change_state_handler'] = change_state_handler metadata['scheduler'] = scheduler metadata['edge_deciders'] = edge_deciders - self._atom_cache[atom.name] = metadata - self._atoms_by_kind['retry'] = all_retry_atoms - self._atoms_by_kind['task'] = all_task_atoms + self._atom_cache[node.name] = metadata @property def compilation(self): @@ -162,15 +162,6 @@ class Runtime(object): metadata = self._atom_cache[atom.name] return metadata['edge_deciders'] - def fetch_atoms_by_kind(self, kind): - """Fetches all the atoms of a given kind. - - NOTE(harlowja): Currently only ``task`` or ``retry`` are valid - kinds of atoms (requesting other kinds will just - return empty lists). - """ - return self._atoms_by_kind.get(kind, []) - def fetch_scheduler(self, atom): """Fetches the cached specific scheduler for the given atom.""" # This does not check if the name exists (since this is only used @@ -197,7 +188,7 @@ class Runtime(object): # Various helper methods used by the runtime components; not for public # consumption... - def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE): + def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE): """Resets all the provided atoms to the given state and intention.""" tweaked = [] for atom in atoms: @@ -213,7 +204,7 @@ class Runtime(object): def reset_all(self, state=st.PENDING, intention=st.EXECUTE): """Resets all atoms to the given state and intention.""" - return self.reset_nodes(self.analyzer.iterate_all_nodes(), + return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS), state=state, intention=intention) def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): @@ -221,8 +212,9 @@ class Runtime(object): The subgraph is contained of all of the atoms successors. """ - return self.reset_nodes(self.analyzer.iterate_subgraph(atom), - state=state, intention=intention) + return self.reset_atoms( + self.analyzer.iterate_connected_atoms(atom), + state=state, intention=intention) def retry_subflow(self, retry): """Prepares a retrys + its subgraph for execution. |