diff options
Diffstat (limited to 'taskflow/engines/action_engine/engine.py')
-rw-r--r-- | taskflow/engines/action_engine/engine.py | 28 |
1 files changed, 13 insertions, 15 deletions
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index cc6b1ac..74e150c 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -241,11 +241,10 @@ class ActionEngine(base.Engine): transient = strutils.bool_from_string( self._options.get('inject_transient', True)) self.storage.ensure_atoms( - self._compilation.execution_graph.nodes_iter()) - for node in self._compilation.execution_graph.nodes_iter(): - if node.inject: - self.storage.inject_atom_args(node.name, - node.inject, + self._runtime.analyzer.iterate_nodes(compiler.ATOMS)) + for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS): + if atom.inject: + self.storage.inject_atom_args(atom.name, atom.inject, transient=transient) @fasteners.locked @@ -255,8 +254,8 @@ class ActionEngine(base.Engine): # flow/task provided or storage provided, if there are still missing # dependencies then this flow will fail at runtime (which we can avoid # by failing at validation time). - execution_graph = self._compilation.execution_graph if LOG.isEnabledFor(logging.BLATHER): + execution_graph = self._compilation.execution_graph LOG.blather("Validating scoping and argument visibility for" " execution graph with %s nodes and %s edges with" " density %0.3f", execution_graph.number_of_nodes(), @@ -269,18 +268,17 @@ class ActionEngine(base.Engine): last_cause = None last_node = None missing_nodes = 0 - fetch_func = self.storage.fetch_unsatisfied_args - for node in execution_graph.nodes_iter(): - node_missing = fetch_func(node.name, node.rebind, - optional_args=node.optional) - if node_missing: - cause = exc.MissingDependencies(node, - sorted(node_missing), + for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS): + atom_missing = self.storage.fetch_unsatisfied_args( + atom.name, atom.rebind, optional_args=atom.optional) + if atom_missing: + cause = exc.MissingDependencies(atom, + sorted(atom_missing), cause=last_cause) last_cause = cause - last_node = node + last_node = atom missing_nodes += 1 - missing.update(node_missing) + missing.update(atom_missing) if missing: # For when a task is provided (instead of a flow) and that # task is the only item in the graph and its missing deps, avoid |