summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/engine.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/action_engine/engine.py')
-rw-r--r--taskflow/engines/action_engine/engine.py28
1 files changed, 13 insertions, 15 deletions
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index cc6b1ac..74e150c 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -241,11 +241,10 @@ class ActionEngine(base.Engine):
transient = strutils.bool_from_string(
self._options.get('inject_transient', True))
self.storage.ensure_atoms(
- self._compilation.execution_graph.nodes_iter())
- for node in self._compilation.execution_graph.nodes_iter():
- if node.inject:
- self.storage.inject_atom_args(node.name,
- node.inject,
+ self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
+ for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ if atom.inject:
+ self.storage.inject_atom_args(atom.name, atom.inject,
transient=transient)
@fasteners.locked
@@ -255,8 +254,8 @@ class ActionEngine(base.Engine):
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at validation time).
- execution_graph = self._compilation.execution_graph
if LOG.isEnabledFor(logging.BLATHER):
+ execution_graph = self._compilation.execution_graph
LOG.blather("Validating scoping and argument visibility for"
" execution graph with %s nodes and %s edges with"
" density %0.3f", execution_graph.number_of_nodes(),
@@ -269,18 +268,17 @@ class ActionEngine(base.Engine):
last_cause = None
last_node = None
missing_nodes = 0
- fetch_func = self.storage.fetch_unsatisfied_args
- for node in execution_graph.nodes_iter():
- node_missing = fetch_func(node.name, node.rebind,
- optional_args=node.optional)
- if node_missing:
- cause = exc.MissingDependencies(node,
- sorted(node_missing),
+ for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ atom_missing = self.storage.fetch_unsatisfied_args(
+ atom.name, atom.rebind, optional_args=atom.optional)
+ if atom_missing:
+ cause = exc.MissingDependencies(atom,
+ sorted(atom_missing),
cause=last_cause)
last_cause = cause
- last_node = node
+ last_node = atom
missing_nodes += 1
- missing.update(node_missing)
+ missing.update(atom_missing)
if missing:
# For when a task is provided (instead of a flow) and that
# task is the only item in the graph and its missing deps, avoid