diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-05-03 15:14:38 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-05-08 03:59:38 +0900 |
commit | 5bdc0a79d1fb67d2da552a902163dec450ff292c (patch) | |
tree | ee63ed73e90dbc9aba1d08971e54016bb7839708 /buildstream/_pipeline.py | |
parent | b8e15706a51272e4f4e116d9e373fd2581102868 (diff) | |
download | buildstream-5bdc0a79d1fb67d2da552a902163dec450ff292c.tar.gz |
_stream.py, _pipeline.py: Refactoring of the pipeline itself
Here the pipeline becomes essentially stateless, some dangling
state remains to be factored out because of frontend accesses
which will be changed in a later commit.
Essentially, the Pipeline.load() method no longer has any knowledge
of the specific purposes of the loaded targets, and now takes
a list of target groups and returns a corresponding list of element
groups.
The Stream() business logic methods now use other pipeline helper
methods to create and filter lists from the loaded target elements.
The Stream() also finally absorbs the Scheduler frontend facing
APIs. However Queues are still exposed on the Stream object for
logging purposes and through callbacks such that the frontend can
retry elements.
Diffstat (limited to 'buildstream/_pipeline.py')
-rw-r--r-- | buildstream/_pipeline.py | 369 |
1 files changed, 181 insertions, 188 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 5327d3fb6..8861556c9 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -18,13 +18,16 @@ # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> +# Tristan Maat <tristan.maat@codethink.co.uk> +import os import itertools from operator import itemgetter from ._exceptions import PipelineError from ._message import Message, MessageType from ._loader import Loader +from ._profile import Topics, profile_start, profile_end from .element import Element from . import Scope, Consistency from ._project import ProjectRefStorage @@ -60,60 +63,59 @@ class PipelineSelection(): # Pipeline() # # Args: -# context (Context): The Context object # project (Project): The Project object -# target (str): A bst filename relative to the project directory -# inconsistent (bool): Whether to load the pipeline in a forcefully inconsistent state, -# this is appropriate when source tracking will run and the -# current source refs will not be the effective refs. -# rewritable (bool): Whether the loaded files should be rewritable -# this is a bit more expensive due to deep copies -# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the -# loading process, if they are not yet locally cached -# -# The ticker methods will be called with an element name for each tick, a final -# tick with None as the argument is passed to signal that processing of this -# stage has terminated. -# -# Raises: -# LoadError -# PluginError -# SourceError -# ElementError -# ProgramNotFoundError +# context (Context): The Context object +# artifacts (Context): The ArtifactCache object # class Pipeline(): - def __init__(self, context, project, artifacts, targets, except_, *, - rewritable=False, - fetch_subprojects=True): + def __init__(self, context, project, artifacts): - self.context = context # The Context - self.project = project # The toplevel project - self.targets = [] # List of toplevel target Element objects + self._context = context # The Context + self._project = project # The toplevel project # # Private members # self._artifacts = artifacts self._loader = None - self._exceptions = None - self._track_cross_junctions = False - self._track_elements = [] - # - # Early initialization - # + # load() + # + # Loads elements from target names. + # + # This function is called with a list of lists, such that multiple + # target groups may be specified. Element names specified in `targets` + # are allowed to be redundant. + # + # Args: + # target_groups (list of lists): Groups of toplevel targets to load + # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the + # loading process, if they are not yet locally cached + # rewritable (bool): Whether the loaded files should be rewritable + # this is a bit more expensive due to deep copies + # + # Returns: + # (tuple of lists): A tuple of grouped Element objects corresponding to target_groups + # + def load(self, target_groups, *, + fetch_subprojects=True, + rewritable=False): + + # First concatenate all the lists for the loader's sake + targets = list(itertools.chain(*target_groups)) - self._loader = Loader(self.context, self.project, targets + except_, + profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) + + self._loader = Loader(self._context, self._project, targets, fetch_subprojects=fetch_subprojects) - with self.context.timed_activity("Loading pipeline", silent_nested=True): + with self._context.timed_activity("Loading pipeline", silent_nested=True): meta_elements = self._loader.load(rewritable, None) # Resolve the real elements now that we've loaded the project - with self.context.timed_activity("Resolving pipeline"): - resolved_elements = [ + with self._context.timed_activity("Resolving pipeline"): + elements = [ Element._new_from_meta(meta, self._artifacts) for meta in meta_elements ] @@ -130,61 +132,89 @@ class Pipeline(): detail += "\n".join(lines) self._message(MessageType.WARN, "Ignoring redundant source references", detail=detail) - self.targets = resolved_elements[:len(targets)] - self._exceptions = resolved_elements[len(targets):] + # Now create element groups to match the input target groups + elt_iter = iter(elements) + element_groups = [ + [next(elt_iter) for i in range(len(group))] + for group in target_groups + ] + + profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) + + return tuple(element_groups) - # initialize() + # resolve_elements() # - # Initialize the pipeline + # Resolve element state and cache keys. # # Args: - # track_element (list of Elements): List of elements specified by the frontend for tracking - # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries - # track_selection (PipelineSelection): The selection algorithm for track elements + # targets (list of Element): The list of toplevel element targets # - def initialize(self, - track_elements=None, - track_cross_junctions=False, - track_selection=PipelineSelection.ALL): + def resolve_elements(self, targets): + with self._context.timed_activity("Resolving cached state", silent_nested=True): + for element in self.dependencies(targets, Scope.ALL): - # Preflight directly, before ever interrogating caches or anything. - self._preflight() + # Preflight + element._preflight() - # Work out what we're going track, if anything - self._track_cross_junctions = track_cross_junctions - if track_elements: - self._track_elements = self._get_elements_to_track(track_elements, track_selection) - - # Now resolve the cache keys once tracking elements have been resolved - self._resolve_cache_keys() + # Determine initial element state. + element._update_state() - # cleanup() + # dependencies() # - # Cleans up resources used by the Pipeline. + # Generator function to iterate over elements and optionally + # also iterate over sources. # - def cleanup(self): - if self._loader: - self._loader.cleanup() + # Args: + # targets (list of Element): The target Elements to loop over + # scope (Scope): The scope to iterate over + # recurse (bool): Whether to recurse into dependencies + # + def dependencies(self, targets, scope, *, recurse=True): + # Keep track of 'visited' in this scope, so that all targets + # share the same context. + visited = {} - # Reset the element loader state - Element._reset_load_state() + for target in targets: + for element in target.dependencies(scope, recurse=recurse, visited=visited): + yield element + + # plan() + # + # Generator function to iterate over only the elements + # which are required to build the pipeline target, omitting + # cached elements. The elements are yielded in a depth sorted + # ordering for optimal build plans + # + # Args: + # elements (list of Element): List of target elements to plan + # + # Returns: + # (list of Element): A depth sorted list of the build plan + # + def plan(self, elements): + return _Planner().plan(elements) # get_selection() # + # Gets a full list of elements based on a toplevel + # list of element targets + # # Args: + # targets (list of Element): The target Elements # mode (PipelineSelection): The PipelineSelection mode # # Various commands define a --deps option to specify what elements to # use in the result, this function reports a list that is appropriate for # the selected option. # - def get_selection(self, mode): + def get_selection(self, targets, mode): elements = None if mode == PipelineSelection.NONE: - elements = self.targets + elements = targets elif mode == PipelineSelection.PLAN: - elements = list(self._plan()) + elements = self.plan(targets) else: if mode == PipelineSelection.ALL: scope = Scope.ALL @@ -193,49 +223,27 @@ class Pipeline(): elif mode == PipelineSelection.RUN: scope = Scope.RUN - elements = list(self.dependencies(scope)) - - return self.remove_elements(elements) - - # dependencies() - # - # Generator function to iterate over elements and optionally - # also iterate over sources. - # - # Args: - # scope (Scope): The scope to iterate over - # recurse (bool): Whether to recurse into dependencies - # include_sources (bool): Whether to include element sources in iteration - # - def dependencies(self, scope, *, recurse=True, include_sources=False): - # Keep track of 'visited' in this scope, so that all targets - # share the same context. - visited = {} - - for target in self.targets: - for element in target.dependencies(scope, recurse=recurse, visited=visited): - if include_sources: - for source in element.sources(): - yield source - yield element + elements = list(self.dependencies(targets, scope)) - ############################################################# - # Commands # - ############################################################# + return elements - # remove_elements(): - # - # Internal function + # except_elements(): # # Return what we are left with after the intersection between # excepted and target elements and their unique dependencies is # gone. # # Args: - # elements (list of elements): The list to remove elements from. - def remove_elements(self, elements): - targeted = list(self.dependencies(Scope.ALL)) - + # targets (list of Element): List of toplevel targetted elements + # elements (list of Element): The list to remove elements from + # except_targets (list of Element): List of toplevel except targets + # + # Returns: + # (list of Element): The elements list with the intersected + # exceptions removed + # + def except_elements(self, targets, elements, except_targets): + targeted = list(self.dependencies(targets, Scope.ALL)) visited = [] def find_intersection(element): @@ -255,7 +263,7 @@ class Pipeline(): # elements that lie on the border closest to excepted elements # between excepted and target elements. intersection = list(itertools.chain.from_iterable( - find_intersection(element) for element in self._exceptions + find_intersection(element) for element in except_targets )) # Now use this set of elements to traverse the targeted @@ -264,7 +272,7 @@ class Pipeline(): queue = [] visited = [] - queue.extend(self.targets) + queue.extend(targets) while queue: element = queue.pop() if element in visited or element in intersection: @@ -282,89 +290,77 @@ class Pipeline(): # in before. return [element for element in elements if element in visited] - ############################################################# - # Private Methods # - ############################################################# - - # _get_elements_to_track(): + # targets_include() + # + # Checks whether the given targets are, or depend on some elements # - # Work out which elements are going to be tracked. + # Args: + # targets (list of Element): A list of targets + # elements (list of Element): List of elements to check + # + # Returns: + # (bool): True if all of `elements` are the `targets`, or are + # somehow depended on by `targets`. # - # Currently the 'mode' parameter only accepts - # PipelineSelection.NONE or PipelineSelection.ALL + def targets_include(self, targets, elements): + target_element_set = set(self.dependencies(targets, Scope.ALL)) + element_set = set(elements) + return element_set.issubset(target_element_set) + + # subtract_elements() # - # This makes the assumption that the except elements are - # meant to be removed from tracking element lists. + # Subtract a subset of elements # # Args: - # track_targets (list of str): List of target names - # mode (PipelineSelection): The PipelineSelection mode + # elements (list of Element): The element list + # subtract (list of Element): List of elements to subtract from elements # # Returns: - # (list): List of Element objects to track + # (list): The original elements list, with elements in subtract removed # - def _get_elements_to_track(self, track_targets, mode=PipelineSelection.ALL): - planner = _Planner() - - # Convert target names to elements - track_elements = [e for e in self.dependencies(Scope.ALL) - if e.name in track_targets] - - if mode != PipelineSelection.NONE: - assert mode == PipelineSelection.ALL - - # Plan them out - track_elements = planner.plan(track_elements, ignore_cache=True) - - # Filter out --except elements - track_elements = self.remove_elements(track_elements) - - # Filter out cross junctioned elements - if self._track_cross_junctions: - self._assert_junction_tracking(track_elements) - else: - track_elements = self._filter_cross_junctions(track_elements) - - return track_elements + def subtract_elements(self, elements, subtract): + subtract_set = set(subtract) + return [ + e for e in elements + if e not in subtract_set + ] - # _prefilght() + # track_cross_junction_filter() # - # Preflights all the plugins in the pipeline + # Filters out elements which are across junction boundaries, + # otherwise asserts that there are no such elements. # - def _preflight(self): - for element in self.dependencies(Scope.ALL): - element._preflight() - - # _resolve_cache_keys() + # This is currently assumed to be only relevant for element + # lists targetted at tracking. # - # Initially resolve the cache keys + # Args: + # elements (list of Element): The list of elements to filter + # cross_junction_requested (bool): Whether the user requested + # cross junction tracking # - def _resolve_cache_keys(self): - track_elements = set(self._track_elements) - - with self.context.timed_activity("Resolving cached state", silent_nested=True): - for element in self.dependencies(Scope.ALL): - if element in track_elements: - # Load the pipeline in an explicitly inconsistent state, use - # this for pipelines with tracking queues enabled. - element._schedule_tracking() + # Returns: + # (list of Element): The filtered or asserted result + # + def track_cross_junction_filter(self, elements, cross_junction_requested): + # Filter out cross junctioned elements + if cross_junction_requested: + self._assert_junction_tracking(elements) + else: + elements = self._filter_cross_junctions(elements) - # Determine initial element state. This may resolve cache keys - # and interrogate the artifact cache. - element._update_state() + return elements - # _assert_consistent() + # assert_consistent() # - # Asserts that the pipeline is in a consistent state, that - # is to say that all sources are consistent and can at least - # be fetched. + # Asserts that the given list of elements are in a consistent state, that + # is to say that all sources are consistent and can at least be fetched. # # Consequently it also means that cache keys can be resolved. # - def _assert_consistent(self, toplevel): + def assert_consistent(self, elements): inconsistent = [] - with self.context.timed_activity("Checking sources"): - for element in toplevel: + with self._context.timed_activity("Checking sources"): + for element in elements: if element._get_consistency() == Consistency.INCONSISTENT: inconsistent.append(element) @@ -375,6 +371,21 @@ class Pipeline(): detail += " " + element._get_full_name() + "\n" raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") + # cleanup() + # + # Cleans up resources used by the Pipeline. + # + def cleanup(self): + if self._loader: + self._loader.cleanup() + + # Reset the element loader state + Element._reset_load_state() + + ############################################################# + # Private Methods # + ############################################################# + # _filter_cross_junction() # # Filters out cross junction elements from the elements @@ -389,7 +400,7 @@ class Pipeline(): def _filter_cross_junctions(self, elements): return [ element for element in elements - if element._get_project() is self.project + if element._get_project() is self._project ] # _assert_junction_tracking() @@ -404,7 +415,7 @@ class Pipeline(): # We can track anything if the toplevel project uses project.refs # - if self.project.ref_storage == ProjectRefStorage.PROJECT_REFS: + if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS: return # Ideally, we would want to report every cross junction element but not @@ -414,37 +425,19 @@ class Pipeline(): # But this is too hard, lets shoot for a simple error. for element in elements: element_project = element._get_project() - if element_project is not self.project: + if element_project is not self._project: detail = "Requested to track sources across junction boundaries\n" + \ "in a project which does not use project.refs ref-storage." raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources") - # _plan() - # - # Args: - # except_ (bool): Whether to filter out the except elements from the plan - # - # Generator function to iterate over only the elements - # which are required to build the pipeline target, omitting - # cached elements. The elements are yielded in a depth sorted - # ordering for optimal build plans - def _plan(self, except_=True): - build_plan = _Planner().plan(self.targets) - - if except_: - build_plan = self.remove_elements(build_plan) - - for element in build_plan: - yield element - # _message() # # Local message propagator # def _message(self, message_type, message, **kwargs): args = dict(kwargs) - self.context.message( + self._context.message( Message(None, message_type, message, **args)) |