diff options
Diffstat (limited to 'taskflow/engines/action_engine/compiler.py')
-rw-r--r-- | taskflow/engines/action_engine/compiler.py | 261 |
1 files changed, 85 insertions, 176 deletions
diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 50ce4eb..0d3e288 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -14,10 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import threading import fasteners +import six from taskflow import exceptions as exc from taskflow import flow @@ -28,18 +28,35 @@ from taskflow.types import tree as tr from taskflow.utils import iter_utils from taskflow.utils import misc +from taskflow.flow import (LINK_INVARIANT, LINK_RETRY) # noqa + LOG = logging.getLogger(__name__) -_RETRY_EDGE_DATA = { - flow.LINK_RETRY: True, -} -_EDGE_INVARIANTS = (flow.LINK_INVARIANT, flow.LINK_MANUAL, flow.LINK_RETRY) -_EDGE_REASONS = flow.LINK_REASONS +# Constants attached to node attributes in the execution graph (and tree +# node metadata), provided as constants here and constants in the compilation +# class (so that users will not have to import this file to access them); but +# provide them as module constants so that internal code can more +# easily access them... +TASK = 'task' +RETRY = 'retry' +FLOW = 'flow' + +# Quite often used together, so make a tuple everyone can share... +ATOMS = (TASK, RETRY) class Compilation(object): """The result of a compilers compile() is this *immutable* object.""" + #: Task nodes will have a ``kind`` attribute/metadata key with this value. + TASK = TASK + + #: Retry nodes will have a ``kind`` attribute/metadata key with this value. + RETRY = RETRY + + #: Flow nodes will have a ``kind`` attribute/metadata key with this value. + FLOW = FLOW + def __init__(self, execution_graph, hierarchy): self._execution_graph = execution_graph self._hierarchy = hierarchy @@ -55,6 +72,12 @@ class Compilation(object): return self._hierarchy +def _overlap_occurence_detector(to_graph, from_graph): + """Returns how many nodes in 'from' graph are in 'to' graph (if any).""" + return iter_utils.count(node for node in from_graph.nodes_iter() + if node in to_graph) + + def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None): """Adds/updates edges from nodes to other nodes in the specified graph. @@ -79,118 +102,7 @@ def _add_update_edges(graph, nodes_from, nodes_to, attr_dict=None): graph.add_edge(u, v, attr_dict=attr_dict.copy()) -class Linker(object): - """Compiler helper that adds pattern(s) constraints onto a graph.""" - - @staticmethod - def _is_not_empty(graph): - # Returns true if the given graph is *not* empty... - return graph.number_of_nodes() > 0 - - @staticmethod - def _find_first_decomposed(node, priors, - decomposed_members, decomposed_filter): - # How this works; traverse backwards and find only the predecessor - # items that are actually connected to this entity, and avoid any - # linkage that is not directly connected. This is guaranteed to be - # valid since we always iter_links() over predecessors before - # successors in all currently known patterns; a queue is used here - # since it is possible for a node to have 2+ different predecessors so - # we must search back through all of them in a reverse BFS order... - # - # Returns the first decomposed graph of those nodes (including the - # passed in node) that passes the provided filter - # function (returns none if none match). - frontier = collections.deque([node]) - # NOTE(harowja): None is in this initial set since the first prior in - # the priors list has None as its predecessor (which we don't want to - # look for a decomposed member of). - visited = set([None]) - while frontier: - node = frontier.popleft() - if node in visited: - continue - node_graph = decomposed_members[node] - if decomposed_filter(node_graph): - return node_graph - visited.add(node) - # TODO(harlowja): optimize this more to avoid searching through - # things already searched... - for (u, v) in reversed(priors): - if node == v: - # Queue its predecessor to be searched in the future... - frontier.append(u) - else: - return None - - def apply_constraints(self, graph, flow, decomposed_members): - # This list is used to track the links that have been previously - # iterated over, so that when we are trying to find a entry to - # connect to that we iterate backwards through this list, finding - # connected nodes to the current target (lets call it v) and find - # the first (u_n, or u_n - 1, u_n - 2...) that was decomposed into - # a non-empty graph. We also retain all predecessors of v so that we - # can correctly locate u_n - 1 if u_n turns out to have decomposed into - # an empty graph (and so on). - priors = [] - # NOTE(harlowja): u, v are flows/tasks (also graph terminology since - # we are compiling things down into a flattened graph), the meaning - # of this link iteration via iter_links() is that u -> v (with the - # provided dictionary attributes, if any). - for (u, v, attr_dict) in flow.iter_links(): - if not priors: - priors.append((None, u)) - v_g = decomposed_members[v] - if not v_g.number_of_nodes(): - priors.append((u, v)) - continue - invariant = any(attr_dict.get(k) for k in _EDGE_INVARIANTS) - if not invariant: - # This is a symbol *only* dependency, connect - # corresponding providers and consumers to allow the consumer - # to be executed immediately after the provider finishes (this - # is an optimization for these types of dependencies...) - u_g = decomposed_members[u] - if not u_g.number_of_nodes(): - # This must always exist, but incase it somehow doesn't... - raise exc.CompilationFailure( - "Non-invariant link being created from '%s' ->" - " '%s' even though the target '%s' was found to be" - " decomposed into an empty graph" % (v, u, u)) - for u in u_g.nodes_iter(): - for v in v_g.nodes_iter(): - # This is using the intersection() method vs the & - # operator since the latter doesn't work with frozen - # sets (when used in combination with ordered sets). - # - # If this is not done the following happens... - # - # TypeError: unsupported operand type(s) - # for &: 'frozenset' and 'OrderedSet' - depends_on = u.provides.intersection(v.requires) - if depends_on: - edge_attrs = { - _EDGE_REASONS: frozenset(depends_on), - } - _add_update_edges(graph, - [u], [v], - attr_dict=edge_attrs) - else: - # Connect nodes with no predecessors in v to nodes with no - # successors in the *first* non-empty predecessor of v (thus - # maintaining the edge dependency). - match = self._find_first_decomposed(u, priors, - decomposed_members, - self._is_not_empty) - if match is not None: - _add_update_edges(graph, - match.no_successors_iter(), - list(v_g.no_predecessors_iter()), - attr_dict=attr_dict) - priors.append((u, v)) - - -class _TaskCompiler(object): +class TaskCompiler(object): """Non-recursive compiler of tasks.""" @staticmethod @@ -199,71 +111,67 @@ class _TaskCompiler(object): def compile(self, task, parent=None): graph = gr.DiGraph(name=task.name) - graph.add_node(task) - node = tr.Node(task) + graph.add_node(task, kind=TASK) + node = tr.Node(task, kind=TASK) if parent is not None: parent.add(node) return graph, node -class _FlowCompiler(object): +class FlowCompiler(object): """Recursive compiler of flows.""" @staticmethod def handles(obj): return isinstance(obj, flow.Flow) - def __init__(self, deep_compiler_func, linker): + def __init__(self, deep_compiler_func): self._deep_compiler_func = deep_compiler_func - self._linker = linker - def _connect_retry(self, retry, graph): - graph.add_node(retry) - - # All nodes that have no predecessors should depend on this retry. - nodes_to = [n for n in graph.no_predecessors_iter() if n is not retry] - if nodes_to: - _add_update_edges(graph, [retry], nodes_to, - attr_dict=_RETRY_EDGE_DATA) - - # Add association for each node of graph that has no existing retry. - for n in graph.nodes_iter(): - if n is not retry and flow.LINK_RETRY not in graph.node[n]: - graph.node[n][flow.LINK_RETRY] = retry - - @staticmethod - def _occurence_detector(to_graph, from_graph): - return iter_utils.count(node for node in from_graph.nodes_iter() - if node in to_graph) - - def _decompose_flow(self, flow, parent=None): - """Decomposes a flow into a graph, tree node + decomposed subgraphs.""" + def compile(self, flow, parent=None): + """Decomposes a flow into a graph and scope tree hierarchy.""" graph = gr.DiGraph(name=flow.name) - node = tr.Node(flow) + graph.add_node(flow, kind=FLOW, noop=True) + tree_node = tr.Node(flow, kind=FLOW, noop=True) if parent is not None: - parent.add(node) + parent.add(tree_node) if flow.retry is not None: - node.add(tr.Node(flow.retry)) - decomposed_members = {} - for item in flow: - subgraph, _subnode = self._deep_compiler_func(item, parent=node) - decomposed_members[item] = subgraph - if subgraph.number_of_nodes(): - graph = gr.merge_graphs( - graph, subgraph, - # We can specialize this to be simpler than the default - # algorithm which creates overhead that we don't - # need for our purposes... - overlap_detector=self._occurence_detector) - return graph, node, decomposed_members - - def compile(self, flow, parent=None): - graph, node, decomposed_members = self._decompose_flow(flow, - parent=parent) - self._linker.apply_constraints(graph, flow, decomposed_members) + tree_node.add(tr.Node(flow.retry, kind=RETRY)) + decomposed = dict( + (child, self._deep_compiler_func(child, parent=tree_node)[0]) + for child in flow) + decomposed_graphs = list(six.itervalues(decomposed)) + graph = gr.merge_graphs(graph, *decomposed_graphs, + overlap_detector=_overlap_occurence_detector) + for u, v, attr_dict in flow.iter_links(): + u_graph = decomposed[u] + v_graph = decomposed[v] + _add_update_edges(graph, u_graph.no_successors_iter(), + list(v_graph.no_predecessors_iter()), + attr_dict=attr_dict) if flow.retry is not None: - self._connect_retry(flow.retry, graph) - return graph, node + graph.add_node(flow.retry, kind=RETRY) + _add_update_edges(graph, [flow], [flow.retry], + attr_dict={LINK_INVARIANT: True}) + for node in graph.nodes_iter(): + if node is not flow.retry and node is not flow: + graph.node[node].setdefault(RETRY, flow.retry) + from_nodes = [flow.retry] + connected_attr_dict = {LINK_INVARIANT: True, LINK_RETRY: True} + else: + from_nodes = [flow] + connected_attr_dict = {LINK_INVARIANT: True} + connected_to = [ + node for node in graph.no_predecessors_iter() if node is not flow + ] + if connected_to: + # Ensure all nodes in this graph(s) that have no + # predecessors depend on this flow (or this flow's retry) so that + # we can depend on the flow being traversed before its + # children (even though at the current time it will be skipped). + _add_update_edges(graph, from_nodes, connected_to, + attr_dict=connected_attr_dict) + return graph, tree_node class PatternCompiler(object): @@ -288,8 +196,8 @@ class PatternCompiler(object): the recursion (now with a decomposed mapping from contained patterns or atoms to there corresponding subgraph) we have to then connect the subgraphs (and the atom(s) there-in) that were decomposed for a pattern - correctly into a new graph (using a :py:class:`.Linker` object to ensure - the pattern mandated constraints are retained) and then return to the + correctly into a new graph and then ensure the pattern mandated + constraints are retained. Finally we then return to the caller (and they will do the same thing up until the root node, which by that point one graph is created with all contained atoms in the pattern/nested patterns mandated ordering). @@ -364,14 +272,10 @@ class PatternCompiler(object): def __init__(self, root, freeze=True): self._root = root self._history = set() - self._linker = Linker() self._freeze = freeze self._lock = threading.Lock() self._compilation = None - self._matchers = [ - _FlowCompiler(self._compile, self._linker), - _TaskCompiler(), - ] + self._matchers = (FlowCompiler(self._compile), TaskCompiler()) self._level = 0 def _compile(self, item, parent=None): @@ -418,12 +322,17 @@ class PatternCompiler(object): def _post_compile(self, graph, node): """Called after the compilation of the root finishes successfully.""" - dup_names = misc.get_duplicate_keys(graph.nodes_iter(), - key=lambda node: node.name) + dup_names = misc.get_duplicate_keys( + (node for node, node_attrs in graph.nodes_iter(data=True) + if node_attrs['kind'] in ATOMS), + key=lambda node: node.name) if dup_names: raise exc.Duplicate( "Atoms with duplicate names found: %s" % (sorted(dup_names))) - if graph.number_of_nodes() == 0: + atoms = iter_utils.count( + node for node, node_attrs in graph.nodes_iter(data=True) + if node_attrs['kind'] in ATOMS) + if atoms == 0: raise exc.Empty("Root container '%s' (%s) is empty" % (self._root, type(self._root))) self._history.clear() |