diff options
Diffstat (limited to 'buildstream/_stream.py')
-rw-r--r-- | buildstream/_stream.py | 688 |
1 files changed, 410 insertions, 278 deletions
diff --git a/buildstream/_stream.py b/buildstream/_stream.py index 62d9f9804..c8d0bb69c 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -17,19 +17,22 @@ # # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> +# Tristan Maat <tristan.maat@codethink.co.uk> + import os import stat import shlex import shutil import tarfile +from contextlib import contextmanager from tempfile import TemporaryDirectory -from ._exceptions import StreamError, ImplError, BstError +from ._exceptions import StreamError, ImplError, BstError, set_last_task_error from ._message import Message, MessageType -from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue +from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue from ._pipeline import Pipeline, PipelineSelection from ._platform import Platform -from ._profile import Topics, profile_start, profile_end from . import utils, _yaml, _site from . import Scope, Consistency @@ -41,25 +44,46 @@ from . import Scope, Consistency # Args: # context (Context): The Context object # project (Project): The Project object -# loaded_callback (callable): A callback to invoke when the pipeline is loaded +# session_start (datetime): The time when the session started +# session_start_callback (callable): A callback to invoke when the session starts +# interrupt_callback (callable): A callback to invoke when we get interrupted +# ticker_callback (callable): Invoked every second while running the scheduler +# job_start_callback (callable): Called when a job starts +# job_complete_callback (callable): Called when a job completes # class Stream(): - def __init__(self, context, project, loaded_callback): - self.session_elements = 0 # Number of elements to process in this session - self.total_elements = 0 # Number of total potential elements for this pipeline - - self._context = context - self._project = project - self._scheduler = None - self._pipeline = None + def __init__(self, context, project, session_start, *, + session_start_callback=None, + interrupt_callback=None, + ticker_callback=None, + job_start_callback=None, + job_complete_callback=None): - self._loaded_cb = loaded_callback + # + # Public members + # + self.targets = [] # Resolved target elements + self.session_elements = [] # List of elements being processed this session + self.total_elements = [] # Total list of elements based on targets + self.queues = [] # Queue objects - # Load selected platform + # + # Private members + # Platform.create_instance(context, project) self._platform = Platform.get_platform() self._artifacts = self._platform.artifactcache + self._context = context + self._project = project + self._pipeline = Pipeline(context, project, self._artifacts) + self._scheduler = Scheduler(context, session_start, + interrupt_callback=interrupt_callback, + ticker_callback=ticker_callback, + job_start_callback=job_start_callback, + job_complete_callback=job_complete_callback) + self._first_non_track_queue = None + self._session_start_callback = session_start_callback # cleanup() # @@ -81,14 +105,18 @@ class Stream(): # except_targets (list of str): Specified targets to except from fetching # downloadable (bool): Whether the downloadable state of elements should be resolved # + # Returns: + # (list of Element): The selected elements def load_selection(self, targets, *, selection=PipelineSelection.NONE, except_targets=(), downloadable=False): - self.init_pipeline(targets, except_=except_targets, - use_configured_remote_caches=downloadable, - fetch_subprojects=False) - return self._pipeline.get_selection(selection) + elements, _ = self._load(targets, (), + selection=selection, + except_targets=except_targets, + use_artifact_config=downloadable, + fetch_subprojects=False) + return elements # shell() # @@ -118,7 +146,7 @@ class Stream(): if directory is None: missing_deps = [ dep._get_full_name() - for dep in self._pipeline.dependencies(scope) + for dep in self._pipeline.dependencies([element], scope) if not dep._cached() ] if missing_deps: @@ -145,66 +173,47 @@ class Stream(): track_cross_junctions=False, build_all=False): - rewritable = False - if track_targets: - rewritable = True + if build_all or track_targets: + selection = PipelineSelection.ALL + else: + selection = PipelineSelection.PLAN - self.init_pipeline(targets, - except_=track_except, - rewritable=rewritable, - use_configured_remote_caches=True, - track_elements=track_targets, - track_cross_junctions=track_cross_junctions, - fetch_subprojects=True) + elements, track_elements = \ + self._load(targets, track_targets, + selection=selection, track_selection=PipelineSelection.ALL, + track_except_targets=track_except, + track_cross_junctions=track_cross_junctions, + use_artifact_config=True, + fetch_subprojects=True) - if build_all: - plan = self._pipeline.dependencies(Scope.ALL) - else: - plan = self._pipeline._plan(except_=False) - - # We want to start the build queue with any elements that are - # not being tracked first - track_elements = set(self._pipeline._track_elements) - plan = [e for e in plan if e not in track_elements] - - # Assert that we have a consistent pipeline now (elements in - # track_plan will be made consistent) - self._pipeline._assert_consistent(plan) - - fetch = FetchQueue(self._scheduler, skip_cached=True) - build = BuildQueue(self._scheduler) - track = None - pull = None - push = None - queues = [] - if self._pipeline._track_elements: - track = TrackQueue(self._scheduler) - queues.append(track) - if self._pipeline._artifacts.has_fetch_remotes(): - pull = PullQueue(self._scheduler) - queues.append(pull) - queues.append(fetch) - queues.append(build) - if self._pipeline._artifacts.has_push_remotes(): - push = PushQueue(self._scheduler) - queues.append(push) - - # If we're going to track, tracking elements go into the first queue - # which is the tracking queue, the rest of the plan goes into the next - # queue (whatever that happens to be) - if track: - queues[0].enqueue(self._pipeline._track_elements) - queues[1].enqueue(plan) - else: - queues[0].enqueue(plan) + # Remove the tracking elements from the main targets + elements = self._pipeline.subtract_elements(elements, track_elements) - self.session_elements = len(self._pipeline._track_elements) + len(plan) + # Assert that the elements we're not going to track are consistent + self._pipeline.assert_consistent(elements) - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + # Now construct the queues + # + track_queue = None + if track_elements: + track_queue = TrackQueue(self._scheduler) + self._add_queue(track_queue, track=True) + + if self._artifacts.has_fetch_remotes(): + self._add_queue(PullQueue(self._scheduler)) + + self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) + self._add_queue(BuildQueue(self._scheduler)) + + if self._artifacts.has_push_remotes(): + self._add_queue(PushQueue(self._scheduler)) + + # Enqueue elements + # + if track_elements: + self._enqueue_plan(track_elements, queue=track_queue) + self._enqueue_plan(elements) + self._run() # fetch() # @@ -223,21 +232,25 @@ class Stream(): track_targets=False, track_cross_junctions=False): - rewritable = False if track_targets: - rewritable = True - - self.init_pipeline(targets, - except_=except_targets, - rewritable=rewritable, - track_elements=targets if track_targets else None, - track_cross_junctions=track_cross_junctions, - fetch_subprojects=True) + track_targets = targets + track_selection = selection + track_except_targets = except_targets + else: + track_targets = () + track_selection = PipelineSelection.NONE + track_except_targets = () - fetch_plan = self._pipeline.get_selection(selection) + elements, track_elements = \ + self._load(targets, track_targets, + selection=selection, track_selection=track_selection, + except_targets=except_targets, + track_except_targets=track_except_targets, + track_cross_junctions=track_cross_junctions, + fetch_subprojects=True) - # Delegated to a shared method for now - self._do_fetch(fetch_plan) + # Delegated to a shared fetch method + self._fetch(elements, track_elements=track_elements) # track() # @@ -255,26 +268,20 @@ class Stream(): def track(self, targets, *, selection=PipelineSelection.NONE, except_targets=None, - track_targets=False, cross_junctions=False): - self.init_pipeline(targets, - except_=except_targets, - rewritable=True, - track_elements=targets, - track_cross_junctions=cross_junctions, - track_selection=selection, - fetch_subprojects=True) + _, elements = \ + self._load(targets, targets, + selection=selection, track_selection=selection, + except_targets=except_targets, + track_except_targets=except_targets, + track_cross_junctions=cross_junctions, + fetch_subprojects=True) - track = TrackQueue(self._scheduler) - track.enqueue(self._pipeline._track_elements) - self.session_elements = len(self._pipeline._track_elements) - - _, status = self._scheduler.run([track]) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + track_queue = TrackQueue(self._scheduler) + self._add_queue(track_queue, track=True) + self._enqueue_plan(elements, queue=track_queue) + self._run() # pull() # @@ -292,33 +299,23 @@ class Stream(): selection=PipelineSelection.NONE, remote=None): - use_configured_remote_caches = True - if remote is not None: - use_configured_remote_caches = False + use_config = True + if remote: + use_config = False - self.init_pipeline(targets, - use_configured_remote_caches=use_configured_remote_caches, - add_remote_cache=remote, - fetch_subprojects=True) + elements, _ = self._load(targets, (), + selection=selection, + use_artifact_config=use_config, + artifact_remote_url=remote, + fetch_subprojects=True) - elements = self._pipeline.get_selection(selection) - - if not self._pipeline._artifacts.has_fetch_remotes(): + if not self._artifacts.has_fetch_remotes(): raise StreamError("No artifact caches available for pulling artifacts") - plan = elements - self._pipeline._assert_consistent(plan) - self._pipeline.session_elements = len(plan) - - pull = PullQueue(self._scheduler) - pull.enqueue(plan) - queues = [pull] - - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + self._pipeline.assert_consistent(elements) + self._add_queue(PullQueue(self._scheduler)) + self._enqueue_plan(elements) + self._run() # push() # @@ -336,33 +333,23 @@ class Stream(): selection=PipelineSelection.NONE, remote=None): - use_configured_remote_caches = True - if remote is not None: - use_configured_remote_caches = False - - self.init_pipeline(targets, - use_configured_remote_caches=use_configured_remote_caches, - add_remote_cache=remote, - fetch_subprojects=True) + use_config = True + if remote: + use_config = False - elements = self._pipeline.get_selection(selection) + elements, _ = self._load(targets, (), + selection=selection, + use_artifact_config=use_config, + artifact_remote_url=remote, + fetch_subprojects=True) - if not self._pipeline._artifacts.has_push_remotes(): + if not self._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") - plan = elements - self._pipeline._assert_consistent(plan) - self._pipeline.session_elements = len(plan) - - push = PushQueue(self._scheduler) - push.enqueue(plan) - queues = [push] - - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + self._pipeline.assert_consistent(elements) + self._add_queue(PushQueue(self._scheduler)) + self._enqueue_plan(elements) + self._run() # checkout() # @@ -382,10 +369,9 @@ class Stream(): integrate=True, hardlinks=False): - self.init_pipeline((target,), fetch_subprojects=True) - # We only have one target in a checkout command - target = self._pipeline.targets[0] + elements, _ = self._load((target,), (), fetch_subprojects=True) + target = elements[0] try: os.makedirs(directory, exist_ok=True) @@ -433,13 +419,13 @@ class Stream(): track_first, force): - self.init_pipeline((target,), - track_elements=[target] if track_first else None, - track_selection=PipelineSelection.NONE, - rewritable=track_first, - fetch_subprojects=False) + if track_first: + track_targets = (target,) + else: + track_targets = () - target = self._pipeline.targets[0] + elements, track_elements = self._load((target,), track_targets) + target = elements[0] workdir = os.path.abspath(directory) if not list(target.sources()): @@ -459,11 +445,11 @@ class Stream(): # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. # - # For now, tracking is handled by _do_fetch() automatically - # by virtue of our screwed up pipeline initialization stuff. - # if not no_checkout or track_first: - self._do_fetch([target]) + track_elements = [] + if track_first: + track_elements = elements + self._fetch(elements, track_elements=track_elements) if not no_checkout and target._get_consistency() != Consistency.CACHED: raise StreamError("Could not stage uncached source. " + @@ -522,34 +508,35 @@ class Stream(): # def workspace_reset(self, targets, *, track_first): - self.init_pipeline(targets, - track_elements=targets if track_first else None, - track_selection=PipelineSelection.NONE, - rewritable=track_first, - fetch_subprojects=False) + if track_first: + track_targets = targets + else: + track_targets = () + + elements, track_elements = self._load(targets, track_targets) # Do the tracking first if track_first: - self._do_fetch(self._pipeline.targets) + self._fetch(elements, track_elements=track_elements) - for target in self._pipeline.targets: - workspace = self._project.workspaces.get_workspace(target.name) + for element in elements: + workspace = self._project.workspaces.get_workspace(element.name) - with target.timed_activity("Removing workspace directory {}" - .format(workspace.path)): + with element.timed_activity("Removing workspace directory {}" + .format(workspace.path)): try: shutil.rmtree(workspace.path) except OSError as e: raise StreamError("Could not remove '{}': {}" .format(workspace.path, e)) from e - self._project.workspaces.delete_workspace(target.name) - self._project.workspaces.create_workspace(target.name, workspace.path) + self._project.workspaces.delete_workspace(element.name) + self._project.workspaces.create_workspace(element.name, workspace.path) - with target.timed_activity("Staging sources to {}".format(workspace.path)): - target._open_workspace() + with element.timed_activity("Staging sources to {}".format(workspace.path)): + element._open_workspace() - self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path)) + self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path)) self._project.workspaces.save_config() @@ -609,15 +596,20 @@ class Stream(): force=False, compression="gz"): - self.init_pipeline((target,), - track_elements=[target] if track_first else None, - track_selection=PipelineSelection.NONE, - rewritable=track_first, - fetch_subprojects=True) + if track_first: + track_targets = (target,) + else: + track_targets = () + + elements, track_elements = self._load((target,), track_targets, + selection=PipelineSelection.ALL, + track_selection=PipelineSelection.ALL, + fetch_subprojects=True) # source-bundle only supports one target - target = self._pipeline.targets[0] - dependencies = self._pipeline.get_selection(PipelineSelection.ALL) + target = self.targets[0] + + self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name)) # Find the correct filename for the compression algorithm tar_location = os.path.join(directory, target.normal_name + ".tar") @@ -635,14 +627,15 @@ class Stream(): raise StreamError("Cannot write to {0}: {1}" .format(tar_location, e)) from e - plan = list(dependencies) - self._do_fetch(plan) + # Fetch and possibly track first + # + self._fetch(elements, track_elements=track_elements) # We don't use the scheduler for this as it is almost entirely IO # bound. # Create a temporary directory to build the source tree in - builddir = target._get_context().builddir + builddir = self._context.builddir prefix = "{}-".format(target.normal_name) with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir: @@ -655,18 +648,162 @@ class Stream(): # Any elements that don't implement _write_script # should not be included in the later stages. - plan = [element for element in plan - if self._write_element_script(source_directory, element)] + elements = [ + element for element in elements + if self._write_element_script(source_directory, element) + ] - self._write_element_sources(tempdir, plan) - self._write_build_script(tempdir, plan) + self._write_element_sources(tempdir, elements) + self._write_build_script(tempdir, elements) self._collect_sources(tempdir, tar_location, target.normal_name, compression) ############################################################# - # Private Methods # + # Scheduler API forwarding # + ############################################################# + + # running + # + # Whether the scheduler is running + # + @property + def running(self): + return self._scheduler.loop is not None + + # suspended + # + # Whether the scheduler is currently suspended + # + @property + def suspended(self): + return self._scheduler.suspended + + # terminated + # + # Whether the scheduler is currently terminated + # + @property + def terminated(self): + return self._scheduler.terminated + + # elapsed_time + # + # Elapsed time since the session start + # + @property + def elapsed_time(self): + return self._scheduler.elapsed_time() + + # terminate() + # + # Terminate jobs + # + def terminate(self): + self._scheduler.terminate_jobs() + + # quit() + # + # Quit the session, this will continue with any ongoing + # jobs, use Stream.terminate() instead for cancellation + # of ongoing jobs + # + def quit(self): + self._scheduler.stop_queueing() + + # suspend() + # + # Context manager to suspend ongoing jobs + # + @contextmanager + def suspend(self): + with self._scheduler.jobs_suspended(): + yield + + ############################################################# + # Private Methods # ############################################################# + # _load() + # + # A convenience method for loading element lists + # + # Args: + # targets (list of str): Main targets to load + # track_targets (list of str): Tracking targets + # selection (PipelineSelection): The selection mode for the specified targets + # track_selection (PipelineSelection): The selection mode for the specified tracking targets + # except_targets (list of str): Specified targets to except from fetching + # track_except_targets (list of str): Specified targets to except from fetching + # track_cross_junctions (bool): Whether tracking should cross junction boundaries + # use_artifact_config (bool): Whether to initialize artifacts with the config + # artifact_remote_url (bool): A remote url for initializing the artifacts + # fetch_subprojects (bool): Whether to fetch subprojects while loading + # + # Returns: + # (list of Element): The primary element selection + # (list of Element): The tracking element selection + # + def _load(self, targets, track_targets, *, + selection=PipelineSelection.NONE, + track_selection=PipelineSelection.NONE, + except_targets=(), + track_except_targets=(), + track_cross_junctions=False, + use_artifact_config=False, + artifact_remote_url=None, + fetch_subprojects=False): + + # Load rewritable if we have any tracking selection to make + rewritable = False + if track_targets: + rewritable = True + + # Load all targets + elements, except_elements, track_elements, track_except_elements = \ + self._pipeline.load([targets, except_targets, track_targets, track_except_targets], + rewritable=rewritable, + fetch_subprojects=fetch_subprojects) + + # Hold on to the targets + self.targets = elements + + # Here we should raise an error if the track_elements targets + # are not dependencies of the primary targets, this is not + # supported. + # + # This can happen with `bst build --track` + # + if not self._pipeline.targets_include(elements, track_elements): + raise StreamError("Specified tracking targets that are not " + "within the scope of primary targets") + + # First take care of marking tracking elements, this must be + # done before resolving element states. + # + assert track_selection != PipelineSelection.PLAN + track_selected = self._pipeline.get_selection(track_elements, track_selection) + track_selected = self._pipeline.except_elements(track_elements, + track_selected, + track_except_elements) + track_selected = self._pipeline.track_cross_junction_filter(track_selected, + track_cross_junctions) + + for element in track_selected: + element._schedule_tracking() + + # Connect to remote caches, this needs to be done before resolving element state + self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url) + + # Now move on to loading primary selection. + # + self._pipeline.resolve_elements(elements) + selected = self._pipeline.get_selection(elements, selection) + selected = self._pipeline.except_elements(elements, + selected, + except_elements) + + return selected, track_selected + # _message() # # Local message propagator @@ -676,47 +813,103 @@ class Stream(): self._context.message( Message(None, message_type, message, **args)) - # _do_fetch() + # _add_queue() + # + # Adds a queue to the stream + # + # Args: + # queue (Queue): Queue to add to the pipeline + # track (bool): Whether this is the tracking queue + # + def _add_queue(self, queue, *, track=False): + self.queues.append(queue) + + if not (track or self._first_non_track_queue): + self._first_non_track_queue = queue + + # _enqueue_plan() + # + # Enqueues planned elements to the specified queue. + # + # Args: + # plan (list of Element): The list of elements to be enqueued + # queue (Queue): The target queue, defaults to the first non-track queue + # + def _enqueue_plan(self, plan, *, queue=None): + queue = queue or self._first_non_track_queue + + queue.enqueue(plan) + self.session_elements += plan + + # _run() + # + # Common function for running the scheduler + # + def _run(self): + + # Inform the frontend of the full list of elements + # and the list of elements which will be processed in this run + # + self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) + + if self._session_start_callback is not None: + self._session_start_callback() + + _, status = self._scheduler.run(self.queues) + + # Force update element states after a run, such that the summary + # is more coherent + try: + for element in self.total_elements: + element._update_state() + except BstError as e: + self._message(MessageType.ERROR, "Error resolving final state", detail=str(e)) + set_last_task_error(e.domain, e.reason) + except Exception as e: # pylint: disable=broad-except + self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e)) + + if status == SchedStatus.ERROR: + raise StreamError() + elif status == SchedStatus.TERMINATED: + raise StreamError(terminated=True) + + # _fetch() # # Performs the fetch job, the body of this function is here because # it is shared between a few internals. # # Args: # elements (list of Element): Elements to fetch + # track_elements (list of Element): Elements to track # - def _do_fetch(self, elements): + def _fetch(self, elements, *, track_elements=None): - fetch_plan = elements + if track_elements is None: + track_elements = [] # Subtract the track elements from the fetch elements, they will be added separately - if self._pipeline._track_elements: - track_elements = set(self._pipeline._track_elements) - fetch_plan = [e for e in fetch_plan if e not in track_elements] + fetch_plan = self._pipeline.subtract_elements(elements, track_elements) # Assert consistency for the fetch elements - self._pipeline._assert_consistent(fetch_plan) + self._pipeline.assert_consistent(fetch_plan) # Filter out elements with cached sources, only from the fetch plan # let the track plan resolve new refs. cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] - fetch_plan = [elt for elt in fetch_plan if elt not in cached] - - self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan) + fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) - fetch = FetchQueue(self._scheduler) - fetch.enqueue(fetch_plan) - if self._pipeline._track_elements: - track = TrackQueue(self._scheduler) - track.enqueue(self._pipeline._track_elements) - queues = [track, fetch] - else: - queues = [fetch] + # Construct queues, enqueue and run + # + track_queue = None + if track_elements: + track_queue = TrackQueue(self._scheduler) + self._add_queue(track_queue, track=True) + self._add_queue(FetchQueue(self._scheduler)) - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + if track_elements: + self._enqueue_plan(track_elements, queue=track_queue) + self._enqueue_plan(fetch_plan) + self._run() # Helper function for checkout() # @@ -772,7 +965,7 @@ class Stream(): # Collect the sources in the given sandbox into a tarfile def _collect_sources(self, directory, tar_name, element_name, compression): - with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)): + with self._context.timed_activity("Creating tarball {}".format(tar_name)): if compression == "none": permissions = "w:" else: @@ -780,64 +973,3 @@ class Stream(): with tarfile.open(tar_name, permissions) as tar: tar.add(directory, arcname=element_name) - - ############################################################# - # TEMPORARY CRAP # - ############################################################# - - # init_pipeline() - # - # Initialize the pipeline for a given activity - # - # Args: - # elements (list of elements): The elements to load recursively - # except_ (list of elements): The elements to except - # rewritable (bool): Whether we should load the YAML files for roundtripping - # use_configured_remote_caches (bool): Whether we should contact remotes - # add_remote_cache (str): The URL for an explicitly mentioned remote cache - # track_elements (list of elements): Elements which are to be tracked - # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries - # track_selection (PipelineSelection): The selection algorithm for track elements - # fetch_subprojects (bool): Whether to fetch subprojects while loading - # - # Note that the except_ argument may have a subtly different meaning depending - # on the activity performed on the Pipeline. In normal circumstances the except_ - # argument excludes elements from the `elements` list. In a build session, the - # except_ elements are excluded from the tracking plan. - # - def init_pipeline(self, elements, *, - except_=tuple(), - rewritable=False, - use_configured_remote_caches=False, - add_remote_cache=None, - track_elements=None, - track_cross_junctions=False, - track_selection=PipelineSelection.ALL, - fetch_subprojects=True): - - profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) - - self._pipeline = Pipeline(self._context, self._project, self._artifacts, - elements, except_, - rewritable=rewritable, - fetch_subprojects=fetch_subprojects) - - # After loading the projects, but before resolving cache keys, - # we need to initialize remote artifact caches where relevant - # - self._artifacts.setup_remotes(use_config=use_configured_remote_caches, - remote_url=add_remote_cache) - - # Now complete the initialization - # - self._pipeline.initialize(track_elements=track_elements, - track_cross_junctions=track_cross_junctions, - track_selection=track_selection) - - profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) - - # Get the total - self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL))) - - if self._loaded_cb is not None: - self._loaded_cb(self._pipeline) |