diff options
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r-- | src/buildstream/_stream.py | 223 |
1 files changed, 82 insertions, 141 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index e0a8d92bb..067357384 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -158,9 +158,8 @@ class Stream: load_refs=False ): with PROFILER.profile(Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)): - target_objects, _ = self._load( + target_objects = self._load( targets, - (), selection=selection, except_targets=except_targets, use_artifact_config=use_artifact_config, @@ -280,9 +279,8 @@ class Stream: if remote: use_config = False - elements, _ = self._load( + elements = self._load( targets, - [], selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, @@ -334,48 +332,24 @@ class Stream: # targets (list of str): Targets to fetch # selection (PipelineSelection): The selection mode for the specified targets # except_targets (list of str): Specified targets to except from fetching - # track_targets (bool): Whether to track selected targets in addition to fetching - # track_cross_junctions (bool): Whether tracking should cross junction boundaries # remote (str|None): The URL of a specific remote server to pull from. # - def fetch( - self, - targets, - *, - selection=PipelineSelection.PLAN, - except_targets=None, - track_targets=False, - track_cross_junctions=False, - remote=None - ): - - if track_targets: - track_targets = targets - track_selection = selection - track_except_targets = except_targets - else: - track_targets = () - track_selection = PipelineSelection.NONE - track_except_targets = () + def fetch(self, targets, *, selection=PipelineSelection.PLAN, except_targets=None, remote=None): use_source_config = True if remote: use_source_config = False - elements, track_elements = self._load( + elements = self._load( targets, - track_targets, selection=selection, - track_selection=track_selection, except_targets=except_targets, - track_except_targets=track_except_targets, - track_cross_junctions=track_cross_junctions, use_source_config=use_source_config, source_remote_url=remote, ) # Delegated to a shared fetch method - self._fetch(elements, track_elements=track_elements) + self._fetch(elements) # track() # @@ -392,17 +366,8 @@ class Stream: # def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False): - # We pass no target to build. Only to track. Passing build targets - # would fully load project configuration which might not be - # possible before tracking is done. - _, elements = self._load( - [], - targets, - selection=selection, - track_selection=selection, - except_targets=except_targets, - track_except_targets=except_targets, - track_cross_junctions=cross_junctions, + elements = self._load_tracking( + targets, selection=selection, except_targets=except_targets, cross_junctions=cross_junctions ) # FIXME: this can be refactored after element._update_state is simplified/removed @@ -434,9 +399,8 @@ class Stream: if remote: use_config = False - elements, _ = self._load( + elements = self._load( targets, - (), selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, @@ -476,9 +440,8 @@ class Stream: if remote: use_config = False - elements, _ = self._load( + elements = self._load( targets, - (), selection=selection, ignore_junction_targets=ignore_junction_targets, use_artifact_config=use_config, @@ -569,7 +532,7 @@ class Stream: tar=False ): - elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True, load_refs=True) + elements = self._load((target,), selection=selection, use_artifact_config=True, load_refs=True) # self.targets contains a list of the loaded target objects # if we specify --deps build, Stream._load() will return a list @@ -778,7 +741,7 @@ class Stream: self._check_location_writable(location, force=force, tar=tar) - elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets) + elements = self._load((target,), selection=deps, except_targets=except_targets) # Assert all sources are cached in the source dir self._fetch(elements) @@ -805,28 +768,18 @@ class Stream: # force (bool): Whether to ignore contents in an existing directory # custom_dir (str): Custom location to create a workspace or false to use default location. # - def workspace_open(self, targets, *, no_checkout, track_first, force, custom_dir): + def workspace_open(self, targets, *, no_checkout, force, custom_dir): # This function is a little funny but it is trying to be as atomic as possible. - if track_first: - track_targets = targets - else: - track_targets = () - - elements, track_elements = self._load( - targets, track_targets, selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT - ) + elements = self._load(targets, selection=PipelineSelection.REDIRECT) workspaces = self._context.get_workspaces() # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. # - if not no_checkout or track_first: - track_elements = [] - if track_first: - track_elements = elements - self._fetch(elements, track_elements=track_elements, fetch_original=True) + if not no_checkout: + self._fetch(elements, fetch_original=True) expanded_directories = [] # To try to be more atomic, loop through the elements and raise any errors we can early @@ -966,9 +919,7 @@ class Stream: # def workspace_reset(self, targets, *, soft, track_first): - elements, _ = self._load( - targets, [], selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT - ) + elements = self._load(targets, selection=PipelineSelection.REDIRECT) nonexisting = [] for element in elements: @@ -991,13 +942,7 @@ class Stream: self.workspace_close(element._get_full_name(), remove_dir=True) workspaces.save_config() - self.workspace_open( - [element._get_full_name()], - no_checkout=False, - track_first=track_first, - force=True, - custom_dir=workspace_path, - ) + self.workspace_open([element._get_full_name()], no_checkout=False, force=True, custom_dir=workspace_path) # workspace_exists # @@ -1076,9 +1021,7 @@ class Stream: else: output_elements.add(e) if load_elements: - loaded_elements, _ = self._load( - load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT - ) + loaded_elements = self._load(load_elements, selection=PipelineSelection.REDIRECT) for e in loaded_elements: output_elements.add(e.name) @@ -1180,6 +1123,62 @@ class Stream: # Private Methods # ############################################################# + # _load_tracking() + # + # A variant of _load() to be used when the elements should be used + # for tracking + # + # If `targets` is not empty used project configuration will be + # fully loaded. + # + # Args: + # targets (list of str): Targets to load + # selection (PipelineSelection): The selection mode for the specified targets + # except_targets (list of str): Specified targets to except + # cross_junctions (bool): Whether tracking should cross junction boundaries + # + # Returns: + # (list of Element): The tracking element selection + # + def _load_tracking(self, targets, *, selection=PipelineSelection.NONE, except_targets=(), cross_junctions=False): + # We never want to use a PLAN selection when tracking elements + assert selection != PipelineSelection.PLAN + + # We can't track artifact refs, since they have no underlying + # elements or sources to interact with. Abort if the user asks + # us to do that. + _, refs = self._classify_artifacts(targets) + if refs: + detail = "\n".join(refs) + raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail) + + # Actually load our elements + loadable = [targets, except_targets] + if any(loadable): + elements, except_elements = self._pipeline.load(loadable, rewritable=True) + else: + elements, except_elements = [], [] + + # Hold on to the targets + self.targets = elements + + track_projects = {} + for element in elements: + project = element._get_project() + if project not in track_projects: + track_projects[project] = [element] + else: + track_projects[project].append(element) + + track_selected = [] + + for project, project_elements in track_projects.items(): + selected = self._pipeline.get_selection(project_elements, selection) + selected = self._pipeline.track_cross_junction_filter(project, selected, cross_junctions) + track_selected.extend(selected) + + return self._pipeline.except_elements(elements, track_selected, except_elements) + # _load() # # A convenience method for loading element lists @@ -1192,12 +1191,8 @@ class Stream: # # Args: # targets (list of str): Main targets to load - # track_targets (list of str): Tracking targets # selection (PipelineSelection): The selection mode for the specified targets - # track_selection (PipelineSelection): The selection mode for the specified tracking targets # except_targets (list of str): Specified targets to except from fetching - # track_except_targets (list of str): Specified targets to except from fetching - # track_cross_junctions (bool): Whether tracking should cross junction boundaries # ignore_junction_targets (bool): Whether junction targets should be filtered out # use_artifact_config (bool): Whether to initialize artifacts with the config # use_source_config (bool): Whether to initialize remote source caches with the config @@ -1211,13 +1206,9 @@ class Stream: def _load( self, targets, - track_targets, *, selection=PipelineSelection.NONE, - track_selection=PipelineSelection.NONE, except_targets=(), - track_except_targets=(), - track_cross_junctions=False, ignore_junction_targets=False, use_artifact_config=False, use_source_config=False, @@ -1237,19 +1228,12 @@ class Stream: if selection in (PipelineSelection.ALL, PipelineSelection.RUN): raise StreamError("Error: '--deps {}' is not supported for artifact refs".format(selection)) - # Load rewritable if we have any tracking selection to make - rewritable = False - if track_targets: - rewritable = True - # Load all target elements - loadable = [target_elements, except_targets, track_targets, track_except_targets] + loadable = [target_elements, except_targets] if any(loadable): - elements, except_elements, track_elements, track_except_elements = self._pipeline.load( - loadable, rewritable=rewritable - ) + elements, except_elements = self._pipeline.load(loadable, rewritable=False) else: - elements, except_elements, track_elements, track_except_elements = [], [], [], [] + elements, except_elements = [], [] # Load all target artifacts artifacts = self._pipeline.load_artifacts(target_artifacts) if target_artifacts else [] @@ -1261,34 +1245,6 @@ class Stream: # Hold on to the targets self.targets = elements + artifacts - # First take care of marking tracking elements, this must be - # done before resolving element states. - # - assert track_selection != PipelineSelection.PLAN - - # Tracked elements are split by owner projects in order to - # filter cross junctions tracking dependencies on their - # respective project. - track_projects = {} - for element in track_elements: - project = element._get_project() - if project not in track_projects: - track_projects[project] = [element] - else: - track_projects[project].append(element) - - track_selected = [] - - for project, project_elements in track_projects.items(): - selected = self._pipeline.get_selection(project_elements, track_selection) - selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions) - track_selected.extend(selected) - - track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements) - - if not targets: - return [], track_selected - # ArtifactCache.setup_remotes expects all projects to be fully loaded for project in self._context.get_projects(): project.ensure_fully_loaded() @@ -1313,7 +1269,7 @@ class Stream: for element in selected: element._set_required() - return selected, track_selected + return selected # _message() # @@ -1396,34 +1352,19 @@ class Stream: # track_elements (list of Element): Elements to track # fetch_original (Bool): Whether to fetch original unstaged # - def _fetch(self, elements, *, track_elements=None, fetch_original=False): - - if track_elements is None: - track_elements = [] - - # Subtract the track elements from the fetch elements, they will be added separately - fetch_plan = self._pipeline.subtract_elements(elements, track_elements) + def _fetch(self, elements, *, fetch_original=False): # Assert consistency for the fetch elements - self._pipeline.assert_consistent(fetch_plan) - # Filter out elements with cached sources, only from the fetch plan # let the track plan resolve new refs. - cached = [elt for elt in fetch_plan if not elt._should_fetch(fetch_original)] - fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) + cached = [elt for elt in elements if not elt._should_fetch(fetch_original)] + fetch_plan = self._pipeline.subtract_elements(elements, cached) + self._pipeline.assert_consistent(elements) # Construct queues, enqueue and run # self._scheduler.clear_queues() - track_queue = None - if track_elements: - track_queue = TrackQueue(self._scheduler) - self._add_queue(track_queue, track=True) self._add_queue(FetchQueue(self._scheduler, fetch_original=fetch_original)) - - if track_elements: - self._enqueue_plan(track_elements, queue=track_queue) - self._enqueue_plan(fetch_plan) self._run() |