summaryrefslogtreecommitdiff
path: root/buildstream/_stream.py
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-30 20:11:30 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-08 03:59:38 +0900
commit541cd76022ca7e9ef8e4114495b908943fe3e8b1 (patch)
tree516861e75a2572d1b3811b3f04af9b087246c020 /buildstream/_stream.py
parentc9f67f1e42734404fb47ddea0b507f45864d1dcf (diff)
downloadbuildstream-541cd76022ca7e9ef8e4114495b908943fe3e8b1.tar.gz
_stream.py, _frontend: Call Stream() APIs with CLI arguments directly.
This shifts the whole responsibility of interpreting command line targets etc to the Stream() object itself. With this commit, the Pipeline() truly becomes slaved to the Stream().
Diffstat (limited to 'buildstream/_stream.py')
-rw-r--r--buildstream/_stream.py544
1 files changed, 410 insertions, 134 deletions
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 93a12f630..09ad51d1b 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -27,6 +27,9 @@ from tempfile import TemporaryDirectory
from ._exceptions import StreamError, ImplError, BstError
from ._message import Message, MessageType
from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._pipeline import Pipeline, PipelineSelection
+from ._platform import Platform
+from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -37,90 +40,120 @@ from . import Scope, Consistency
#
# Args:
# context (Context): The Context object
+# project (Project): The Project object
+# loaded_callback (callable): A callback to invoke when the pipeline is loaded
#
class Stream():
- def __init__(self, context):
+ def __init__(self, context, project, loaded_callback):
self.session_elements = 0 # Number of elements to process in this session
self.total_elements = 0 # Number of total potential elements for this pipeline
self._context = context
+ self._project = project
self._scheduler = None
self._pipeline = None
- # track()
+ self._loaded_cb = loaded_callback
+
+ # Load selected platform
+ Platform.create_instance(context, project)
+ self._platform = Platform.get_platform()
+ self._artifacts = self._platform.artifactcache
+
+ # cleanup()
#
- # Trackes all the sources of all the elements in the pipeline,
- # i.e. all of the elements which the target somehow depends on.
+ # Cleans up application state
#
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ def cleanup(self):
+ if self._pipeline:
+ self._pipeline.cleanup()
+
+ # load_selection()
#
- # If no error is encountered while tracking, then the project files
- # are rewritten inline.
+ # An all purpose method for loading a selection of elements, this
+ # is primarily useful for the frontend to implement `bst show`
+ # and `bst shell`.
#
- def track(self, scheduler):
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Args:
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # downloadable (bool): Whether the downloadable state of elements should be resolved
+ #
+ def load_selection(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=(),
+ downloadable=False):
+ self.init_pipeline(targets, except_=except_targets,
+ use_configured_remote_caches=downloadable)
+ return self._pipeline.get_selection(selection)
- # fetch()
+ # shell()
#
- # Fetches sources on the pipeline.
+ # Run a shell
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to fetch
+ # element (Element): An Element object to run the shell for
+ # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN)
+ # prompt (str): The prompt to display in the shell
+ # directory (str): A directory where an existing prestaged sysroot is expected, or None
+ # mounts (list of HostMount): Additional directories to mount into the sandbox
+ # isolate (bool): Whether to isolate the environment like we do in builds
+ # command (list): An argv to launch in the sandbox, or None
#
- def fetch(self, scheduler, dependencies):
- fetch_plan = dependencies
-
- # Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
-
- # Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
-
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Returns:
+ # (int): The exit code of the launched shell
+ #
+ def shell(self, element, scope, prompt, *,
+ directory=None,
+ mounts=None,
+ isolate=False,
+ command=None):
+
+ # Assert we have everything we need built, unless the directory is specified
+ # in which case we just blindly trust the directory, using the element
+ # definitions to control the execution environment only.
+ if directory is None:
+ missing_deps = [
+ dep._get_full_name()
+ for dep in self._pipeline.dependencies(scope)
+ if not dep._cached()
+ ]
+ if missing_deps:
+ raise StreamError("Elements need to be built or downloaded before staging a shell environment",
+ detail="\n".join(missing_deps))
+
+ return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
# build()
#
# Builds (assembles) elements in the pipeline.
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ # targets (list of str): Targets to build
+ # track_targets (list of str): Specified targets for tracking
+ # track_except (list of str): Specified targets to except from tracking
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
# build_all (bool): Whether to build all elements, or only those
# which are required to build the target.
#
- def build(self, scheduler, *, build_all=False):
+ def build(self, targets, *,
+ track_targets=None,
+ track_except=None,
+ track_cross_junctions=False,
+ build_all=False):
+
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ self.init_pipeline(targets,
+ except_=track_except,
+ rewritable=rewritable,
+ use_configured_remote_caches=True,
+ track_elements=track_targets,
+ track_cross_junctions=track_cross_junctions)
if build_all:
plan = self._pipeline.dependencies(Scope.ALL)
@@ -171,63 +204,101 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
- # checkout()
+ # fetch()
#
- # Checkout the pipeline target artifact to the specified directory
+ # Fetches sources on the pipeline.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- # force (bool): Force overwrite files which exist in `directory`
- # integrate (bool): Whether to run integration commands
- # hardlinks (bool): Whether checking out files hardlinked to
- # their artifacts is acceptable
+ # targets (list of str): Targets to fetch
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_targets (bool): Whether to track selected targets in addition to fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
#
- def checkout(self, directory, force, integrate, hardlinks):
- # We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ def fetch(self, targets, *,
+ selection=PipelineSelection.PLAN,
+ except_targets=None,
+ track_targets=False,
+ track_cross_junctions=False):
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+ rewritable = False
+ if track_targets:
+ rewritable = True
- if not os.access(directory, os.W_OK):
- raise StreamError("Directory {} not writable".format(directory))
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=rewritable,
+ track_elements=targets if track_targets else None,
+ track_cross_junctions=track_cross_junctions)
- if not force and os.listdir(directory):
- raise StreamError("Checkout directory is not empty: {}"
- .format(directory))
+ fetch_plan = self._pipeline.get_selection(selection)
- # Stage deps into a temporary sandbox first
- try:
- with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+ # Delegated to a shared method for now
+ self._do_fetch(fetch_plan)
- # Copy or move the sandbox to the target directory
- sandbox_root = sandbox.get_directory()
- with target.timed_activity("Checking out files in {}".format(directory)):
- try:
- if hardlinks:
- self._checkout_hardlinks(sandbox_root, directory)
- else:
- utils.copy_files(sandbox_root, directory)
- except OSError as e:
- raise StreamError("Failed to checkout files: {}".format(e)) from e
- except BstError as e:
- raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
- reason=e.reason) from e
+ # track()
+ #
+ # Tracks all the sources of the selected elements.
+ #
+ # Args:
+ # targets (list of str): Targets to track
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from tracking
+ # cross_junctions (bool): Whether tracking should cross junction boundaries
+ #
+ # If no error is encountered while tracking, then the project files
+ # are rewritten inline.
+ #
+ def track(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=None,
+ track_targets=False,
+ cross_junctions=False):
+
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=True,
+ track_elements=targets,
+ track_cross_junctions=cross_junctions,
+ track_selection=selection)
+
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ self.session_elements = len(self._pipeline._track_elements)
+
+ _, status = self._scheduler.run([track])
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
# pull()
#
- # Pulls elements from the pipeline
+ # Pulls artifacts from remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to pull
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to pull from, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to pull artifacts from.
#
- def pull(self, scheduler, elements):
+ def pull(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_fetch_remotes():
- raise StreamError("Not artifact caches available for pulling artifacts")
+ raise StreamError("No artifact caches available for pulling artifacts")
plan = elements
self._pipeline._assert_consistent(plan)
@@ -245,13 +316,28 @@ class Stream():
# push()
#
- # Pushes elements in the pipeline
+ # Pulls artifacts to remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to push
+ # targets (list of str): Targets to push
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to push to, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to push artifacts to.
#
- def push(self, scheduler, elements):
+ def push(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
@@ -270,19 +356,81 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
+ # checkout()
+ #
+ # Checkout the pipeline target artifact to the specified directory
+ #
+ # Args:
+ # target (str): Target to checkout
+ # directory (str): The directory to checkout the artifact to
+ # force (bool): Force overwrite files which exist in `directory`
+ # integrate (bool): Whether to run integration commands
+ # hardlinks (bool): Whether checking out files hardlinked to
+ # their artifacts is acceptable
+ #
+ def checkout(self, target, *,
+ directory=None,
+ force=False,
+ integrate=True,
+ hardlinks=False):
+
+ self.init_pipeline((target,))
+
+ # We only have one target in a checkout command
+ target = self._pipeline.targets[0]
+
+ try:
+ os.makedirs(directory, exist_ok=True)
+ except OSError as e:
+ raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+
+ if not os.access(directory, os.W_OK):
+ raise StreamError("Directory {} not writable".format(directory))
+
+ if not force and os.listdir(directory):
+ raise StreamError("Checkout directory is not empty: {}"
+ .format(directory))
+
+ # Stage deps into a temporary sandbox first
+ try:
+ with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+
+ # Copy or move the sandbox to the target directory
+ sandbox_root = sandbox.get_directory()
+ with target.timed_activity("Checking out files in {}".format(directory)):
+ try:
+ if hardlinks:
+ self._checkout_hardlinks(sandbox_root, directory)
+ else:
+ utils.copy_files(sandbox_root, directory)
+ except OSError as e:
+ raise StreamError("Failed to checkout files: {}".format(e)) from e
+ except BstError as e:
+ raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
+ reason=e.reason) from e
+
# workspace_open
#
# Open a project workspace
#
# Args:
- # target (Element): The element to open the workspace for
+ # target (str): The target element to open the workspace for
# directory (str): The directory to stage the source in
# no_checkout (bool): Whether to skip checking out the source
# track_first (bool): Whether to track and fetch first
# force (bool): Whether to ignore contents in an existing directory
#
- def workspace_open(self, target, directory, no_checkout, track_first, force):
- project = self._context.get_toplevel_project()
+ def workspace_open(self, target, directory, *,
+ no_checkout,
+ track_first,
+ force):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
+
+ target = self._pipeline.targets[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -294,15 +442,19 @@ class Stream():
raise StreamError("The given element has no sources", detail=detail)
# Check for workspace config
- workspace = project.workspaces.get_workspace(target.name)
+ workspace = self._project.workspaces.get_workspace(target.name)
if workspace:
raise StreamError("Workspace '{}' is already defined at: {}"
.format(target.name, workspace.path))
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
+ #
+ # For now, tracking is handled by _do_fetch() automatically
+ # by virtue of our screwed up pipeline initialization stuff.
+ #
if not no_checkout or track_first:
- self.fetch(self._scheduler, [target])
+ self._do_fetch([target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -315,13 +467,13 @@ class Stream():
except OSError as e:
raise StreamError("Failed to create workspace directory: {}".format(e)) from e
- project.workspaces.create_workspace(target.name, workdir)
+ self._project.workspaces.create_workspace(target.name, workdir)
if not no_checkout:
with target.timed_activity("Staging sources to {}".format(directory)):
target._open_workspace()
- project.workspaces.save_config()
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Saved workspace configuration")
# workspace_close
@@ -332,9 +484,8 @@ class Stream():
# element_name (str): The element name to close the workspace for
# remove_dir (bool): Whether to remove the associated directory
#
- def workspace_close(self, element_name, remove_dir):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(element_name)
+ def workspace_close(self, element_name, *, remove_dir):
+ workspace = self._project.workspaces.get_workspace(element_name)
# Remove workspace directory if prompted
if remove_dir:
@@ -347,8 +498,8 @@ class Stream():
.format(workspace.path, e)) from e
# Delete the workspace and save the configuration
- project.workspaces.delete_workspace(element_name)
- project.workspaces.save_config()
+ self._project.workspaces.delete_workspace(element_name)
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
# workspace_reset
@@ -357,19 +508,40 @@ class Stream():
# changes.
#
# Args:
- # target (Element): The element to reset the workspace for
- # track (bool): Whether to also track the source
+ # targets (list of str): The target elements to reset the workspace for
+ # track_first (bool): Whether to also track the sources first
#
- def workspace_reset(self, target, track):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(target.name)
+ def workspace_reset(self, targets, *, track_first):
- if workspace is None:
- raise StreamError("Workspace '{}' is currently not defined"
- .format(target.name))
+ self.init_pipeline(targets,
+ track_elements=targets if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
- self.workspace_close(target.name, True)
- self.workspace_open(target, workspace.path, False, track, False)
+ # Do the tracking first
+ if track_first:
+ self._do_fetch(self._pipeline.targets)
+
+ for target in self._pipeline.targets:
+ workspace = self._project.workspaces.get_workspace(target.name)
+
+ with target.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
+ try:
+ shutil.rmtree(workspace.path)
+ except OSError as e:
+ raise StreamError("Could not remove '{}': {}"
+ .format(workspace.path, e)) from e
+
+ self._project.workspaces.delete_workspace(target.name)
+ self._project.workspaces.create_workspace(target.name, workspace.path)
+
+ with target.timed_activity("Staging sources to {}".format(workspace.path)):
+ target._open_workspace()
+
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+
+ self._project.workspaces.save_config()
# workspace_exists
#
@@ -385,13 +557,11 @@ class Stream():
# True if there are any existing workspaces.
#
def workspace_exists(self, element_name=None):
- project = self._context.get_toplevel_project()
-
if element_name:
- workspace = project.workspaces.get_workspace(element_name)
+ workspace = self._project.workspaces.get_workspace(element_name)
if workspace:
return True
- elif any(project.workspaces.list()):
+ elif any(self._project.workspaces.list()):
return True
return False
@@ -401,9 +571,8 @@ class Stream():
# Serializes the workspaces and dumps them in YAML to stdout.
#
def workspace_list(self):
- project = self._context.get_toplevel_project()
workspaces = []
- for element_name, workspace_ in project.workspaces.list():
+ for element_name, workspace_ in self._project.workspaces.list():
workspace_detail = {
'element': element_name,
'directory': workspace_.path,
@@ -416,16 +585,28 @@ class Stream():
# source_bundle()
#
- # Create a build bundle for the given artifact.
+ # Create a host buildable tarball bundle for the given target.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- #
- def source_bundle(self, scheduler, dependencies, force,
- track_first, compression, directory):
+ # target (str): The target element to bundle
+ # directory (str): The directory to output the tarball
+ # track_first (bool): Track new source references before bundling
+ # compression (str): The compression type to use
+ # force (bool): Overwrite an existing tarball
+ #
+ def source_bundle(self, target, directory, *,
+ track_first=False,
+ force=False,
+ compression="gz"):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
# source-bundle only supports one target
target = self._pipeline.targets[0]
+ dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -444,7 +625,7 @@ class Stream():
.format(tar_location, e)) from e
plan = list(dependencies)
- self.fetch(self._scheduler, plan)
+ self._do_fetch(plan)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
@@ -484,6 +665,48 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
+ # _do_fetch()
+ #
+ # Performs the fetch job, the body of this function is here because
+ # it is shared between a few internals.
+ #
+ # Args:
+ # elements (list of Element): Elements to fetch
+ #
+ def _do_fetch(self, elements):
+
+ fetch_plan = elements
+
+ # Subtract the track elements from the fetch elements, they will be added separately
+ if self._pipeline._track_elements:
+ track_elements = set(self._pipeline._track_elements)
+ fetch_plan = [e for e in fetch_plan if e not in track_elements]
+
+ # Assert consistency for the fetch elements
+ self._pipeline._assert_consistent(fetch_plan)
+
+ # Filter out elements with cached sources, only from the fetch plan
+ # let the track plan resolve new refs.
+ cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
+ fetch_plan = [elt for elt in fetch_plan if elt not in cached]
+
+ self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+
+ fetch = FetchQueue(self._scheduler)
+ fetch.enqueue(fetch_plan)
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ queues = [track, fetch]
+ else:
+ queues = [fetch]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
# Helper function for checkout()
#
def _checkout_hardlinks(self, sandbox_root, directory):
@@ -546,3 +769,56 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
+
+ #############################################################
+ # TEMPORARY CRAP #
+ #############################################################
+
+ # init_pipeline()
+ #
+ # Initialize the pipeline for a given activity
+ #
+ # Args:
+ # elements (list of elements): The elements to load recursively
+ # except_ (list of elements): The elements to except
+ # rewritable (bool): Whether we should load the YAML files for roundtripping
+ # use_configured_remote_caches (bool): Whether we should contact remotes
+ # add_remote_cache (str): The URL for an explicitly mentioned remote cache
+ # track_elements (list of elements): Elements which are to be tracked
+ # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
+ # track_selection (PipelineSelection): The selection algorithm for track elements
+ # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
+ # loading process, if they are not yet locally cached
+ #
+ # Note that the except_ argument may have a subtly different meaning depending
+ # on the activity performed on the Pipeline. In normal circumstances the except_
+ # argument excludes elements from the `elements` list. In a build session, the
+ # except_ elements are excluded from the tracking plan.
+ #
+ def init_pipeline(self, elements, *,
+ except_=tuple(),
+ rewritable=False,
+ use_configured_remote_caches=False,
+ add_remote_cache=None,
+ track_elements=None,
+ track_cross_junctions=False,
+ track_selection=PipelineSelection.ALL):
+
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ self._pipeline = Pipeline(self._context, self._project, self._artifacts,
+ elements, except_, rewritable=rewritable)
+
+ self._pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=add_remote_cache,
+ track_elements=track_elements,
+ track_cross_junctions=track_cross_junctions,
+ track_selection=track_selection)
+
+ profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ # Get the total
+ self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
+
+ if self._loaded_cb is not None:
+ self._loaded_cb(self._pipeline)