summaryrefslogtreecommitdiff
path: root/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_stream.py')
-rw-r--r--buildstream/_stream.py544
1 files changed, 410 insertions, 134 deletions
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 93a12f630..09ad51d1b 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -27,6 +27,9 @@ from tempfile import TemporaryDirectory
from ._exceptions import StreamError, ImplError, BstError
from ._message import Message, MessageType
from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._pipeline import Pipeline, PipelineSelection
+from ._platform import Platform
+from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -37,90 +40,120 @@ from . import Scope, Consistency
#
# Args:
# context (Context): The Context object
+# project (Project): The Project object
+# loaded_callback (callable): A callback to invoke when the pipeline is loaded
#
class Stream():
- def __init__(self, context):
+ def __init__(self, context, project, loaded_callback):
self.session_elements = 0 # Number of elements to process in this session
self.total_elements = 0 # Number of total potential elements for this pipeline
self._context = context
+ self._project = project
self._scheduler = None
self._pipeline = None
- # track()
+ self._loaded_cb = loaded_callback
+
+ # Load selected platform
+ Platform.create_instance(context, project)
+ self._platform = Platform.get_platform()
+ self._artifacts = self._platform.artifactcache
+
+ # cleanup()
#
- # Trackes all the sources of all the elements in the pipeline,
- # i.e. all of the elements which the target somehow depends on.
+ # Cleans up application state
#
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ def cleanup(self):
+ if self._pipeline:
+ self._pipeline.cleanup()
+
+ # load_selection()
#
- # If no error is encountered while tracking, then the project files
- # are rewritten inline.
+ # An all purpose method for loading a selection of elements, this
+ # is primarily useful for the frontend to implement `bst show`
+ # and `bst shell`.
#
- def track(self, scheduler):
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Args:
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # downloadable (bool): Whether the downloadable state of elements should be resolved
+ #
+ def load_selection(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=(),
+ downloadable=False):
+ self.init_pipeline(targets, except_=except_targets,
+ use_configured_remote_caches=downloadable)
+ return self._pipeline.get_selection(selection)
- # fetch()
+ # shell()
#
- # Fetches sources on the pipeline.
+ # Run a shell
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to fetch
+ # element (Element): An Element object to run the shell for
+ # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN)
+ # prompt (str): The prompt to display in the shell
+ # directory (str): A directory where an existing prestaged sysroot is expected, or None
+ # mounts (list of HostMount): Additional directories to mount into the sandbox
+ # isolate (bool): Whether to isolate the environment like we do in builds
+ # command (list): An argv to launch in the sandbox, or None
#
- def fetch(self, scheduler, dependencies):
- fetch_plan = dependencies
-
- # Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
-
- # Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
-
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Returns:
+ # (int): The exit code of the launched shell
+ #
+ def shell(self, element, scope, prompt, *,
+ directory=None,
+ mounts=None,
+ isolate=False,
+ command=None):
+
+ # Assert we have everything we need built, unless the directory is specified
+ # in which case we just blindly trust the directory, using the element
+ # definitions to control the execution environment only.
+ if directory is None:
+ missing_deps = [
+ dep._get_full_name()
+ for dep in self._pipeline.dependencies(scope)
+ if not dep._cached()
+ ]
+ if missing_deps:
+ raise StreamError("Elements need to be built or downloaded before staging a shell environment",
+ detail="\n".join(missing_deps))
+
+ return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
# build()
#
# Builds (assembles) elements in the pipeline.
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ # targets (list of str): Targets to build
+ # track_targets (list of str): Specified targets for tracking
+ # track_except (list of str): Specified targets to except from tracking
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
# build_all (bool): Whether to build all elements, or only those
# which are required to build the target.
#
- def build(self, scheduler, *, build_all=False):
+ def build(self, targets, *,
+ track_targets=None,
+ track_except=None,
+ track_cross_junctions=False,
+ build_all=False):
+
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ self.init_pipeline(targets,
+ except_=track_except,
+ rewritable=rewritable,
+ use_configured_remote_caches=True,
+ track_elements=track_targets,
+ track_cross_junctions=track_cross_junctions)
if build_all:
plan = self._pipeline.dependencies(Scope.ALL)
@@ -171,63 +204,101 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
- # checkout()
+ # fetch()
#
- # Checkout the pipeline target artifact to the specified directory
+ # Fetches sources on the pipeline.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- # force (bool): Force overwrite files which exist in `directory`
- # integrate (bool): Whether to run integration commands
- # hardlinks (bool): Whether checking out files hardlinked to
- # their artifacts is acceptable
+ # targets (list of str): Targets to fetch
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_targets (bool): Whether to track selected targets in addition to fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
#
- def checkout(self, directory, force, integrate, hardlinks):
- # We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ def fetch(self, targets, *,
+ selection=PipelineSelection.PLAN,
+ except_targets=None,
+ track_targets=False,
+ track_cross_junctions=False):
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+ rewritable = False
+ if track_targets:
+ rewritable = True
- if not os.access(directory, os.W_OK):
- raise StreamError("Directory {} not writable".format(directory))
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=rewritable,
+ track_elements=targets if track_targets else None,
+ track_cross_junctions=track_cross_junctions)
- if not force and os.listdir(directory):
- raise StreamError("Checkout directory is not empty: {}"
- .format(directory))
+ fetch_plan = self._pipeline.get_selection(selection)
- # Stage deps into a temporary sandbox first
- try:
- with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+ # Delegated to a shared method for now
+ self._do_fetch(fetch_plan)
- # Copy or move the sandbox to the target directory
- sandbox_root = sandbox.get_directory()
- with target.timed_activity("Checking out files in {}".format(directory)):
- try:
- if hardlinks:
- self._checkout_hardlinks(sandbox_root, directory)
- else:
- utils.copy_files(sandbox_root, directory)
- except OSError as e:
- raise StreamError("Failed to checkout files: {}".format(e)) from e
- except BstError as e:
- raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
- reason=e.reason) from e
+ # track()
+ #
+ # Tracks all the sources of the selected elements.
+ #
+ # Args:
+ # targets (list of str): Targets to track
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from tracking
+ # cross_junctions (bool): Whether tracking should cross junction boundaries
+ #
+ # If no error is encountered while tracking, then the project files
+ # are rewritten inline.
+ #
+ def track(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=None,
+ track_targets=False,
+ cross_junctions=False):
+
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=True,
+ track_elements=targets,
+ track_cross_junctions=cross_junctions,
+ track_selection=selection)
+
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ self.session_elements = len(self._pipeline._track_elements)
+
+ _, status = self._scheduler.run([track])
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
# pull()
#
- # Pulls elements from the pipeline
+ # Pulls artifacts from remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to pull
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to pull from, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to pull artifacts from.
#
- def pull(self, scheduler, elements):
+ def pull(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_fetch_remotes():
- raise StreamError("Not artifact caches available for pulling artifacts")
+ raise StreamError("No artifact caches available for pulling artifacts")
plan = elements
self._pipeline._assert_consistent(plan)
@@ -245,13 +316,28 @@ class Stream():
# push()
#
- # Pushes elements in the pipeline
+ # Pulls artifacts to remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to push
+ # targets (list of str): Targets to push
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to push to, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to push artifacts to.
#
- def push(self, scheduler, elements):
+ def push(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
@@ -270,19 +356,81 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
+ # checkout()
+ #
+ # Checkout the pipeline target artifact to the specified directory
+ #
+ # Args:
+ # target (str): Target to checkout
+ # directory (str): The directory to checkout the artifact to
+ # force (bool): Force overwrite files which exist in `directory`
+ # integrate (bool): Whether to run integration commands
+ # hardlinks (bool): Whether checking out files hardlinked to
+ # their artifacts is acceptable
+ #
+ def checkout(self, target, *,
+ directory=None,
+ force=False,
+ integrate=True,
+ hardlinks=False):
+
+ self.init_pipeline((target,))
+
+ # We only have one target in a checkout command
+ target = self._pipeline.targets[0]
+
+ try:
+ os.makedirs(directory, exist_ok=True)
+ except OSError as e:
+ raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+
+ if not os.access(directory, os.W_OK):
+ raise StreamError("Directory {} not writable".format(directory))
+
+ if not force and os.listdir(directory):
+ raise StreamError("Checkout directory is not empty: {}"
+ .format(directory))
+
+ # Stage deps into a temporary sandbox first
+ try:
+ with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+
+ # Copy or move the sandbox to the target directory
+ sandbox_root = sandbox.get_directory()
+ with target.timed_activity("Checking out files in {}".format(directory)):
+ try:
+ if hardlinks:
+ self._checkout_hardlinks(sandbox_root, directory)
+ else:
+ utils.copy_files(sandbox_root, directory)
+ except OSError as e:
+ raise StreamError("Failed to checkout files: {}".format(e)) from e
+ except BstError as e:
+ raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
+ reason=e.reason) from e
+
# workspace_open
#
# Open a project workspace
#
# Args:
- # target (Element): The element to open the workspace for
+ # target (str): The target element to open the workspace for
# directory (str): The directory to stage the source in
# no_checkout (bool): Whether to skip checking out the source
# track_first (bool): Whether to track and fetch first
# force (bool): Whether to ignore contents in an existing directory
#
- def workspace_open(self, target, directory, no_checkout, track_first, force):
- project = self._context.get_toplevel_project()
+ def workspace_open(self, target, directory, *,
+ no_checkout,
+ track_first,
+ force):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
+
+ target = self._pipeline.targets[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -294,15 +442,19 @@ class Stream():
raise StreamError("The given element has no sources", detail=detail)
# Check for workspace config
- workspace = project.workspaces.get_workspace(target.name)
+ workspace = self._project.workspaces.get_workspace(target.name)
if workspace:
raise StreamError("Workspace '{}' is already defined at: {}"
.format(target.name, workspace.path))
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
+ #
+ # For now, tracking is handled by _do_fetch() automatically
+ # by virtue of our screwed up pipeline initialization stuff.
+ #
if not no_checkout or track_first:
- self.fetch(self._scheduler, [target])
+ self._do_fetch([target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -315,13 +467,13 @@ class Stream():
except OSError as e:
raise StreamError("Failed to create workspace directory: {}".format(e)) from e
- project.workspaces.create_workspace(target.name, workdir)
+ self._project.workspaces.create_workspace(target.name, workdir)
if not no_checkout:
with target.timed_activity("Staging sources to {}".format(directory)):
target._open_workspace()
- project.workspaces.save_config()
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Saved workspace configuration")
# workspace_close
@@ -332,9 +484,8 @@ class Stream():
# element_name (str): The element name to close the workspace for
# remove_dir (bool): Whether to remove the associated directory
#
- def workspace_close(self, element_name, remove_dir):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(element_name)
+ def workspace_close(self, element_name, *, remove_dir):
+ workspace = self._project.workspaces.get_workspace(element_name)
# Remove workspace directory if prompted
if remove_dir:
@@ -347,8 +498,8 @@ class Stream():
.format(workspace.path, e)) from e
# Delete the workspace and save the configuration
- project.workspaces.delete_workspace(element_name)
- project.workspaces.save_config()
+ self._project.workspaces.delete_workspace(element_name)
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
# workspace_reset
@@ -357,19 +508,40 @@ class Stream():
# changes.
#
# Args:
- # target (Element): The element to reset the workspace for
- # track (bool): Whether to also track the source
+ # targets (list of str): The target elements to reset the workspace for
+ # track_first (bool): Whether to also track the sources first
#
- def workspace_reset(self, target, track):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(target.name)
+ def workspace_reset(self, targets, *, track_first):
- if workspace is None:
- raise StreamError("Workspace '{}' is currently not defined"
- .format(target.name))
+ self.init_pipeline(targets,
+ track_elements=targets if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
- self.workspace_close(target.name, True)
- self.workspace_open(target, workspace.path, False, track, False)
+ # Do the tracking first
+ if track_first:
+ self._do_fetch(self._pipeline.targets)
+
+ for target in self._pipeline.targets:
+ workspace = self._project.workspaces.get_workspace(target.name)
+
+ with target.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
+ try:
+ shutil.rmtree(workspace.path)
+ except OSError as e:
+ raise StreamError("Could not remove '{}': {}"
+ .format(workspace.path, e)) from e
+
+ self._project.workspaces.delete_workspace(target.name)
+ self._project.workspaces.create_workspace(target.name, workspace.path)
+
+ with target.timed_activity("Staging sources to {}".format(workspace.path)):
+ target._open_workspace()
+
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+
+ self._project.workspaces.save_config()
# workspace_exists
#
@@ -385,13 +557,11 @@ class Stream():
# True if there are any existing workspaces.
#
def workspace_exists(self, element_name=None):
- project = self._context.get_toplevel_project()
-
if element_name:
- workspace = project.workspaces.get_workspace(element_name)
+ workspace = self._project.workspaces.get_workspace(element_name)
if workspace:
return True
- elif any(project.workspaces.list()):
+ elif any(self._project.workspaces.list()):
return True
return False
@@ -401,9 +571,8 @@ class Stream():
# Serializes the workspaces and dumps them in YAML to stdout.
#
def workspace_list(self):
- project = self._context.get_toplevel_project()
workspaces = []
- for element_name, workspace_ in project.workspaces.list():
+ for element_name, workspace_ in self._project.workspaces.list():
workspace_detail = {
'element': element_name,
'directory': workspace_.path,
@@ -416,16 +585,28 @@ class Stream():
# source_bundle()
#
- # Create a build bundle for the given artifact.
+ # Create a host buildable tarball bundle for the given target.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- #
- def source_bundle(self, scheduler, dependencies, force,
- track_first, compression, directory):
+ # target (str): The target element to bundle
+ # directory (str): The directory to output the tarball
+ # track_first (bool): Track new source references before bundling
+ # compression (str): The compression type to use
+ # force (bool): Overwrite an existing tarball
+ #
+ def source_bundle(self, target, directory, *,
+ track_first=False,
+ force=False,
+ compression="gz"):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
# source-bundle only supports one target
target = self._pipeline.targets[0]
+ dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -444,7 +625,7 @@ class Stream():
.format(tar_location, e)) from e
plan = list(dependencies)
- self.fetch(self._scheduler, plan)
+ self._do_fetch(plan)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
@@ -484,6 +665,48 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
+ # _do_fetch()
+ #
+ # Performs the fetch job, the body of this function is here because
+ # it is shared between a few internals.
+ #
+ # Args:
+ # elements (list of Element): Elements to fetch
+ #
+ def _do_fetch(self, elements):
+
+ fetch_plan = elements
+
+ # Subtract the track elements from the fetch elements, they will be added separately
+ if self._pipeline._track_elements:
+ track_elements = set(self._pipeline._track_elements)
+ fetch_plan = [e for e in fetch_plan if e not in track_elements]
+
+ # Assert consistency for the fetch elements
+ self._pipeline._assert_consistent(fetch_plan)
+
+ # Filter out elements with cached sources, only from the fetch plan
+ # let the track plan resolve new refs.
+ cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
+ fetch_plan = [elt for elt in fetch_plan if elt not in cached]
+
+ self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+
+ fetch = FetchQueue(self._scheduler)
+ fetch.enqueue(fetch_plan)
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ queues = [track, fetch]
+ else:
+ queues = [fetch]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
# Helper function for checkout()
#
def _checkout_hardlinks(self, sandbox_root, directory):
@@ -546,3 +769,56 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
+
+ #############################################################
+ # TEMPORARY CRAP #
+ #############################################################
+
+ # init_pipeline()
+ #
+ # Initialize the pipeline for a given activity
+ #
+ # Args:
+ # elements (list of elements): The elements to load recursively
+ # except_ (list of elements): The elements to except
+ # rewritable (bool): Whether we should load the YAML files for roundtripping
+ # use_configured_remote_caches (bool): Whether we should contact remotes
+ # add_remote_cache (str): The URL for an explicitly mentioned remote cache
+ # track_elements (list of elements): Elements which are to be tracked
+ # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
+ # track_selection (PipelineSelection): The selection algorithm for track elements
+ # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
+ # loading process, if they are not yet locally cached
+ #
+ # Note that the except_ argument may have a subtly different meaning depending
+ # on the activity performed on the Pipeline. In normal circumstances the except_
+ # argument excludes elements from the `elements` list. In a build session, the
+ # except_ elements are excluded from the tracking plan.
+ #
+ def init_pipeline(self, elements, *,
+ except_=tuple(),
+ rewritable=False,
+ use_configured_remote_caches=False,
+ add_remote_cache=None,
+ track_elements=None,
+ track_cross_junctions=False,
+ track_selection=PipelineSelection.ALL):
+
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ self._pipeline = Pipeline(self._context, self._project, self._artifacts,
+ elements, except_, rewritable=rewritable)
+
+ self._pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=add_remote_cache,
+ track_elements=track_elements,
+ track_cross_junctions=track_cross_junctions,
+ track_selection=track_selection)
+
+ profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ # Get the total
+ self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
+
+ if self._loaded_cb is not None:
+ self._loaded_cb(self._pipeline)