summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildstream/_frontend/app.py95
-rw-r--r--buildstream/_frontend/cli.py223
-rw-r--r--buildstream/_pipeline.py9
-rw-r--r--buildstream/_stream.py544
-rw-r--r--tests/plugins/pipeline.py2
5 files changed, 553 insertions, 320 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index ea6aed0d9..5b0cfc4b1 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,9 +39,7 @@ from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
-from .._pipeline import Pipeline, PipelineSelection
from .._scheduler import Scheduler
-from .._profile import Topics, profile_start, profile_end
from .._versions import BST_FORMAT_VERSION
from .. import __version__ as build_stream_version
from .. import _yaml
@@ -72,7 +70,6 @@ class App():
self.stream = None # The Stream object
self.project = None # The toplevel Project object
self.scheduler = None # The Scheduler
- self.pipeline = None # The Pipeline
self.logger = None # The LogLine object
self.interactive = None # Whether we are running in interactive mode
self.colors = None # Whether to use colors in logging
@@ -81,6 +78,7 @@ class App():
# Private members
#
self._session_start = datetime.datetime.now()
+ self._session_name = None
self._main_options = main_options # Main CLI options, before any command
self._status = None # The Status object
self._fail_messages = {} # Failure messages by unique plugin id
@@ -194,7 +192,7 @@ class App():
self._error_exit(e, "Error loading project")
# Create the stream right away, we'll need to pass it around
- self.stream = Stream(self.context)
+ self.stream = Stream(self.context, self.project, self.loaded_cb)
# Create the application's scheduler
self.scheduler = Scheduler(self.context, self._session_start,
@@ -242,15 +240,7 @@ class App():
# reporting the errors and exiting with a consistent error status.
#
# Args:
- # elements (list of elements): The elements to load recursively
# session_name (str): The name of the session, or None for no session
- # except_ (list of elements): The elements to except
- # rewritable (bool): Whether we should load the YAML files for roundtripping
- # use_configured_remote_caches (bool): Whether we should contact remotes
- # add_remote_cache (str): The URL for an explicitly mentioned remote cache
- # track_elements (list of elements): Elements which are to be tracked
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
# loading process, if they are not yet locally cached
#
@@ -263,51 +253,29 @@ class App():
# the session header and summary, and time the main session from startup time.
#
@contextmanager
- def initialized(self, elements, *, session_name=None,
- except_=tuple(), rewritable=False,
- use_configured_remote_caches=False,
- add_remote_cache=None,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL,
+ def initialized(self, *,
+ session_name=None,
fetch_subprojects=False):
+ self._session_name = session_name
+
# Start with the early stage init, this enables logging right away
with self.partially_initialized(fetch_subprojects=fetch_subprojects):
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
# Mark the beginning of the session
if session_name:
self._message(MessageType.START, session_name)
- try:
- self.pipeline = Pipeline(self.context, self.project, elements, except_,
- rewritable=rewritable)
- except BstError as e:
- self._error_exit(e, "Error loading pipeline")
-
- # Initialize pipeline
- try:
- self.pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=add_remote_cache,
- track_elements=track_elements,
- track_cross_junctions=track_cross_junctions,
- track_selection=track_selection)
- except BstError as e:
- self._error_exit(e, "Error initializing pipeline")
-
# XXX This is going to change soon !
#
self.stream._scheduler = self.scheduler
- self.stream._pipeline = self.pipeline
- self.stream.total_elements = len(list(self.pipeline.dependencies(Scope.ALL)))
-
- profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
- # Print the heading
- if session_name:
- self._print_heading()
+ # XXX Print the heading
+ #
+ # WE NEED A STREAM CALLBACK FOR POST LOAD SESSION START
+ #
+ # if session_name:
+ # self._print_heading()
# Run the body of the session here, once everything is loaded
try:
@@ -405,22 +373,18 @@ class App():
click.echo("Created project.conf at: {}".format(project_path), err=True)
sys.exit(0)
- # shell()
+ # shell_prompt():
#
- # Run a shell
+ # Creates a prompt for a shell environment, using ANSI color codes
+ # if they are available in the execution context.
#
# Args:
- # element (Element): An Element object to run the shell for
- # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN)
- # directory (str): A directory where an existing prestaged sysroot is expected, or None
- # mounts (list of HostMount): Additional directories to mount into the sandbox
- # isolate (bool): Whether to isolate the environment like we do in builds
- # command (list): An argv to launch in the sandbox, or None
+ # element (Element): The Element object to resolve a prompt for
#
# Returns:
- # (int): The exit code of the launched shell
+ # (str): The formatted prompt to display in the shell
#
- def shell(self, element, scope, directory, *, mounts=None, isolate=False, command=None):
+ def shell_prompt(self, element):
_, key, dim = element._get_display_key()
element_name = element._get_full_name()
@@ -435,7 +399,7 @@ class App():
else:
prompt = '[{}@{}:${{PWD}}]$ '.format(key, element_name)
- return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
+ return prompt
# cleanup()
#
@@ -444,8 +408,8 @@ class App():
# This is called by Click at exit time
#
def cleanup(self):
- if self.pipeline:
- self.pipeline.cleanup()
+ if self.stream:
+ self.stream.cleanup()
############################################################
# Local Functions #
@@ -609,7 +573,8 @@ class App():
if choice == 'shell':
click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
try:
- self.shell(element, Scope.BUILD, failure.sandbox, isolate=True)
+ prompt = self.shell_prompt(element)
+ self.stream.shell(element, Scope.BUILD, prompt, directory=failure.sandbox, isolate=True)
except BstError as e:
click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
elif choice == 'log':
@@ -632,14 +597,14 @@ class App():
queue.enqueue([element])
#
- # Prints the application startup heading, used for commands which
- # will process a pipeline.
+ # Print the session heading if we've loaded a pipeline and there
+ # is going to be a session
#
- def _print_heading(self, deps=None):
- self.logger.print_heading(self.pipeline,
- self._main_options['log_file'],
- styling=self.colors,
- deps=deps)
+ def loaded_cb(self, pipeline):
+ if self._session_name:
+ self.logger.print_heading(pipeline,
+ self._main_options['log_file'],
+ styling=self.colors)
#
# Print a summary of the queues
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 1393db35a..1ac4548b6 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -235,15 +235,12 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
if track_all:
track_ = elements
- rewritable = False
- if track_:
- rewritable = True
-
- with app.initialized(elements, session_name="Build", except_=track_except, rewritable=rewritable,
- use_configured_remote_caches=True, track_elements=track_,
+ with app.initialized(session_name="Build", fetch_subprojects=True):
+ app.stream.build(elements,
+ track_targets=track_,
+ track_except=track_except,
track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True):
- app.stream.build(app.scheduler, build_all=all_)
+ build_all=all_)
##################################################################
@@ -282,12 +279,12 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions):
click.echo("ERROR: The --track-cross-junctions option can only be used with --track", err=True)
sys.exit(-1)
- with app.initialized(elements, session_name="Fetch", except_=except_, rewritable=track_,
- track_elements=elements if track_ else None,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True):
- dependencies = app.pipeline.get_selection(deps)
- app.stream.fetch(app.scheduler, dependencies)
+ with app.initialized(session_name="Fetch", fetch_subprojects=True):
+ app.stream.fetch(elements,
+ selection=deps,
+ except_targets=except_,
+ track_targets=track_,
+ track_cross_junctions=track_cross_junctions)
##################################################################
@@ -318,12 +315,11 @@ def track(app, elements, deps, except_, cross_junctions):
none: No dependencies, just the specified elements
all: All dependencies of all specified elements
"""
- with app.initialized(elements, session_name="Track", except_=except_, rewritable=True,
- track_elements=elements,
- track_cross_junctions=cross_junctions,
- track_selection=deps,
- fetch_subprojects=True):
- app.stream.track(app.scheduler)
+ with app.initialized(session_name="Track", fetch_subprojects=True):
+ app.stream.track(elements,
+ selection=deps,
+ except_targets=except_,
+ cross_junctions=cross_junctions)
##################################################################
@@ -351,10 +347,8 @@ def pull(app, elements, deps, remote):
none: No dependencies, just the element itself
all: All dependencies
"""
- with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None),
- add_remote_cache=remote, fetch_subprojects=True):
- to_pull = app.pipeline.get_selection(deps)
- app.stream.pull(app.scheduler, to_pull)
+ with app.initialized(session_name="Pull", fetch_subprojects=True):
+ app.stream.pull(elements, selection=deps, remote=remote)
##################################################################
@@ -381,11 +375,8 @@ def push(app, elements, deps, remote):
none: No dependencies, just the element itself
all: All dependencies
"""
- with app.initialized(elements, session_name="Push",
- use_configured_remote_caches=(remote is None),
- add_remote_cache=remote, fetch_subprojects=True):
- to_push = app.pipeline.get_selection(deps)
- app.stream.push(app.scheduler, to_push)
+ with app.initialized(session_name="Push", fetch_subprojects=True):
+ app.stream.push(elements, selection=deps, remote=remote)
##################################################################
@@ -456,9 +447,12 @@ def show(app, elements, deps, except_, order, format_, downloadable):
bst show target.bst --format \\
$'---------- %{name} ----------\\n%{vars}'
"""
- with app.initialized(elements, except_=except_, use_configured_remote_caches=downloadable):
+ with app.initialized():
+ dependencies = app.stream.load_selection(elements,
+ selection=deps,
+ except_targets=except_,
+ downloadable=downloadable)
- dependencies = app.pipeline.get_selection(deps)
if order == "alpha":
dependencies = sorted(dependencies)
@@ -466,8 +460,7 @@ def show(app, elements, deps, except_, order, format_, downloadable):
format_ = app.context.log_element_format
report = app.logger.show_pipeline(dependencies, format_)
-
- click.echo(report, color=app.colors)
+ click.echo(report, color=app.colors)
##################################################################
@@ -507,43 +500,32 @@ def shell(app, element, sysroot, mount, isolate, build_, command):
"""
from ..element import Scope
from .._project import HostMount
+ from .._pipeline import PipelineSelection
+
if build_:
scope = Scope.BUILD
else:
scope = Scope.RUN
- with app.initialized((element,)):
- pass
-
- # Assert we have everything we need built.
- missing_deps = []
- if scope is not None:
- for dep in app.pipeline.dependencies(scope):
- if not dep._cached():
- missing_deps.append(dep)
-
- if missing_deps:
- click.echo("", err=True)
- click.echo("Missing elements for staging an environment for a shell:", err=True)
- for dep in missing_deps:
- click.echo(" {}".format(dep.name), err=True)
- click.echo("", err=True)
- click.echo("Try building them first", err=True)
- sys.exit(-1)
-
- mounts = [
- HostMount(path, host_path)
- for host_path, path in mount
- ]
+ with app.initialized():
+ dependencies = app.stream.load_selection((element,), selection=PipelineSelection.NONE)
+ element = dependencies[0]
+ prompt = app.shell_prompt(element)
+ mounts = [
+ HostMount(path, host_path)
+ for host_path, path in mount
+ ]
+ try:
+ exitcode = app.stream.shell(element, scope, prompt,
+ directory=sysroot,
+ mounts=mounts,
+ isolate=isolate,
+ command=command)
+ except BstError as e:
+ raise AppError("Error launching shell: {}".format(e), detail=e.detail) from e
- try:
- element = app.pipeline.targets[0]
- exitcode = app.shell(element, scope, sysroot, mounts=mounts, isolate=isolate, command=command)
- sys.exit(exitcode)
- except BstError as e:
- click.echo("", err=True)
- click.echo("Errors shelling into this pipeline: {}".format(e), err=True)
- sys.exit(-1)
+ # If there were no errors, we return the shell's exit code here.
+ sys.exit(exitcode)
##################################################################
@@ -563,37 +545,12 @@ def shell(app, element, sysroot, mount, isolate, build_, command):
def checkout(app, element, directory, force, integrate, hardlinks):
"""Checkout a built artifact to the specified directory
"""
- with app.initialized((element,)):
- app.stream.checkout(directory, force, integrate, hardlinks)
-
-
-##################################################################
-# Source Bundle Command #
-##################################################################
-@cli.command(name="source-bundle", short_help="Produce a build bundle to be manually executed")
-@click.option('--except', 'except_', multiple=True,
- type=click.Path(dir_okay=False, readable=True),
- help="Elements to except from the tarball")
-@click.option('--compression', default='gz',
- type=click.Choice(['none', 'gz', 'bz2', 'xz']),
- help="Compress the tar file using the given algorithm.")
-@click.option('--track', 'track_', default=False, is_flag=True,
- help="Track new source references before building")
-@click.option('--force', '-f', default=False, is_flag=True,
- help="Overwrite files existing in checkout directory")
-@click.option('--directory', default=os.getcwd(),
- help="The directory to write the tarball to")
-@click.argument('target',
- type=click.Path(dir_okay=False, readable=True))
-@click.pass_obj
-def source_bundle(app, target, force, directory,
- track_, compression, except_):
- """Produce a source bundle to be manually executed
- """
- with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None):
- dependencies = app.pipeline.get_selection('all')
- app.stream.source_bundle(app.scheduler, dependencies, force, track_,
- compression, directory)
+ with app.initialized():
+ app.stream.checkout(element,
+ directory=directory,
+ force=force,
+ integrate=integrate,
+ hardlinks=hardlinks)
##################################################################
@@ -632,10 +589,11 @@ def workspace_open(app, no_checkout, force, track_, element, directory):
click.echo("Checkout directory is not empty: {}".format(directory), err=True)
sys.exit(-1)
- with app.initialized((element,), rewritable=track_, track_elements=[element] if track_ else None):
- # This command supports only one target
- target = app.pipeline.targets[0]
- app.stream.workspace_open(target, directory, no_checkout, track_, force)
+ with app.initialized():
+ app.stream.workspace_open(element, directory,
+ no_checkout=no_checkout,
+ track_first=track_,
+ force=force)
##################################################################
@@ -656,7 +614,7 @@ def workspace_close(app, remove_dir, all_, elements):
click.echo('ERROR: no elements specified', err=True)
sys.exit(-1)
- with app.partially_initialized():
+ with app.initialized():
# Early exit if we specified `all` and there are no workspaces
if all_ and not app.stream.workspace_exists():
@@ -679,7 +637,7 @@ def workspace_close(app, remove_dir, all_, elements):
if all_:
elements = [element_name for element_name, _ in app.project.workspaces.list()]
for element_name in elements:
- app.stream.workspace_close(element_name, remove_dir)
+ app.stream.workspace_close(element_name, remove_dir=remove_dir)
##################################################################
@@ -696,22 +654,31 @@ def workspace_close(app, remove_dir, all_, elements):
def workspace_reset(app, track_, all_, elements):
"""Reset a workspace to its original state"""
- if not (all_ or elements):
- click.echo('ERROR: no elements specified', err=True)
- sys.exit(-1)
+ # Check that the workspaces in question exist
+ with app.initialized():
- if app.interactive:
- if not click.confirm('This will remove all your changes, are you sure?'):
- click.echo('Aborting', err=True)
- sys.exit(-1)
+ if not (all_ or elements):
+ raise AppError('No elements specified to reset')
+
+ if all_ and not app.stream.workspace_exists():
+ raise AppError("No open workspaces to reset")
+
+ nonexisting = []
+ for element_name in elements:
+ if not app.stream.workspace_exists(element_name):
+ nonexisting.append(element_name)
+ if nonexisting:
+ raise AppError("Workspace does not exist", detail="\n".join(nonexisting))
+
+ if app.interactive:
+ if not click.confirm('This will remove all your changes, are you sure?'):
+ click.echo('Aborting', err=True)
+ sys.exit(-1)
- with app.partially_initialized():
if all_:
elements = tuple(element_name for element_name, _ in app.project.workspaces.list())
- with app.initialized(elements):
- for target in app.pipeline.targets:
- app.stream.workspace_reset(target, track_)
+ app.stream.workspace_reset(elements, track_first=track_)
##################################################################
@@ -722,5 +689,35 @@ def workspace_reset(app, track_, all_, elements):
def workspace_list(app):
"""List open workspaces"""
- with app.partially_initialized():
+ with app.initialized():
app.stream.workspace_list()
+
+
+##################################################################
+# Source Bundle Command #
+##################################################################
+@cli.command(name="source-bundle", short_help="Produce a build bundle to be manually executed")
+@click.option('--except', 'except_', multiple=True,
+ type=click.Path(dir_okay=False, readable=True),
+ help="Elements to except from the tarball")
+@click.option('--compression', default='gz',
+ type=click.Choice(['none', 'gz', 'bz2', 'xz']),
+ help="Compress the tar file using the given algorithm.")
+@click.option('--track', 'track_', default=False, is_flag=True,
+ help="Track new source references before bundling")
+@click.option('--force', '-f', default=False, is_flag=True,
+ help="Overwrite an existing tarball")
+@click.option('--directory', default=os.getcwd(),
+ help="The directory to write the tarball to")
+@click.argument('element',
+ type=click.Path(dir_okay=False, readable=True))
+@click.pass_obj
+def source_bundle(app, element, force, directory,
+ track_, compression, except_):
+ """Produce a source bundle to be manually executed
+ """
+ with app.initialized(fetch_subprojects=True):
+ app.stream.source_bundle(element, directory,
+ track_first=track_,
+ force=force,
+ compression=compression)
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 52bae2e5c..7f3c657fa 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -27,7 +27,6 @@ from ._message import Message, MessageType
from ._loader import Loader
from .element import Element
from . import Scope, Consistency
-from ._platform import Platform
from ._project import ProjectRefStorage
from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs
@@ -87,7 +86,7 @@ class PipelineSelection():
#
class Pipeline():
- def __init__(self, context, project, targets, except_, rewritable=False):
+ def __init__(self, context, project, artifacts, targets, except_, rewritable=False):
self.context = context # The Context
self.project = project # The toplevel project
@@ -96,7 +95,7 @@ class Pipeline():
#
# Private members
#
- self._artifacts = None
+ self._artifacts = artifacts
self._loader = None
self._exceptions = None
self._track_cross_junctions = False
@@ -106,10 +105,6 @@ class Pipeline():
# Early initialization
#
- # Load selected platform
- Platform.create_instance(context, project)
- platform = Platform.get_platform()
- self._artifacts = platform.artifactcache
self._loader = Loader(self.context, self.project, targets + except_)
with self.context.timed_activity("Loading pipeline", silent_nested=True):
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 93a12f630..09ad51d1b 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -27,6 +27,9 @@ from tempfile import TemporaryDirectory
from ._exceptions import StreamError, ImplError, BstError
from ._message import Message, MessageType
from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._pipeline import Pipeline, PipelineSelection
+from ._platform import Platform
+from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -37,90 +40,120 @@ from . import Scope, Consistency
#
# Args:
# context (Context): The Context object
+# project (Project): The Project object
+# loaded_callback (callable): A callback to invoke when the pipeline is loaded
#
class Stream():
- def __init__(self, context):
+ def __init__(self, context, project, loaded_callback):
self.session_elements = 0 # Number of elements to process in this session
self.total_elements = 0 # Number of total potential elements for this pipeline
self._context = context
+ self._project = project
self._scheduler = None
self._pipeline = None
- # track()
+ self._loaded_cb = loaded_callback
+
+ # Load selected platform
+ Platform.create_instance(context, project)
+ self._platform = Platform.get_platform()
+ self._artifacts = self._platform.artifactcache
+
+ # cleanup()
#
- # Trackes all the sources of all the elements in the pipeline,
- # i.e. all of the elements which the target somehow depends on.
+ # Cleans up application state
#
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ def cleanup(self):
+ if self._pipeline:
+ self._pipeline.cleanup()
+
+ # load_selection()
#
- # If no error is encountered while tracking, then the project files
- # are rewritten inline.
+ # An all purpose method for loading a selection of elements, this
+ # is primarily useful for the frontend to implement `bst show`
+ # and `bst shell`.
#
- def track(self, scheduler):
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Args:
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # downloadable (bool): Whether the downloadable state of elements should be resolved
+ #
+ def load_selection(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=(),
+ downloadable=False):
+ self.init_pipeline(targets, except_=except_targets,
+ use_configured_remote_caches=downloadable)
+ return self._pipeline.get_selection(selection)
- # fetch()
+ # shell()
#
- # Fetches sources on the pipeline.
+ # Run a shell
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to fetch
+ # element (Element): An Element object to run the shell for
+ # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN)
+ # prompt (str): The prompt to display in the shell
+ # directory (str): A directory where an existing prestaged sysroot is expected, or None
+ # mounts (list of HostMount): Additional directories to mount into the sandbox
+ # isolate (bool): Whether to isolate the environment like we do in builds
+ # command (list): An argv to launch in the sandbox, or None
#
- def fetch(self, scheduler, dependencies):
- fetch_plan = dependencies
-
- # Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
-
- # Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
-
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Returns:
+ # (int): The exit code of the launched shell
+ #
+ def shell(self, element, scope, prompt, *,
+ directory=None,
+ mounts=None,
+ isolate=False,
+ command=None):
+
+ # Assert we have everything we need built, unless the directory is specified
+ # in which case we just blindly trust the directory, using the element
+ # definitions to control the execution environment only.
+ if directory is None:
+ missing_deps = [
+ dep._get_full_name()
+ for dep in self._pipeline.dependencies(scope)
+ if not dep._cached()
+ ]
+ if missing_deps:
+ raise StreamError("Elements need to be built or downloaded before staging a shell environment",
+ detail="\n".join(missing_deps))
+
+ return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
# build()
#
# Builds (assembles) elements in the pipeline.
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ # targets (list of str): Targets to build
+ # track_targets (list of str): Specified targets for tracking
+ # track_except (list of str): Specified targets to except from tracking
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
# build_all (bool): Whether to build all elements, or only those
# which are required to build the target.
#
- def build(self, scheduler, *, build_all=False):
+ def build(self, targets, *,
+ track_targets=None,
+ track_except=None,
+ track_cross_junctions=False,
+ build_all=False):
+
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ self.init_pipeline(targets,
+ except_=track_except,
+ rewritable=rewritable,
+ use_configured_remote_caches=True,
+ track_elements=track_targets,
+ track_cross_junctions=track_cross_junctions)
if build_all:
plan = self._pipeline.dependencies(Scope.ALL)
@@ -171,63 +204,101 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
- # checkout()
+ # fetch()
#
- # Checkout the pipeline target artifact to the specified directory
+ # Fetches sources on the pipeline.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- # force (bool): Force overwrite files which exist in `directory`
- # integrate (bool): Whether to run integration commands
- # hardlinks (bool): Whether checking out files hardlinked to
- # their artifacts is acceptable
+ # targets (list of str): Targets to fetch
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_targets (bool): Whether to track selected targets in addition to fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
#
- def checkout(self, directory, force, integrate, hardlinks):
- # We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ def fetch(self, targets, *,
+ selection=PipelineSelection.PLAN,
+ except_targets=None,
+ track_targets=False,
+ track_cross_junctions=False):
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+ rewritable = False
+ if track_targets:
+ rewritable = True
- if not os.access(directory, os.W_OK):
- raise StreamError("Directory {} not writable".format(directory))
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=rewritable,
+ track_elements=targets if track_targets else None,
+ track_cross_junctions=track_cross_junctions)
- if not force and os.listdir(directory):
- raise StreamError("Checkout directory is not empty: {}"
- .format(directory))
+ fetch_plan = self._pipeline.get_selection(selection)
- # Stage deps into a temporary sandbox first
- try:
- with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+ # Delegated to a shared method for now
+ self._do_fetch(fetch_plan)
- # Copy or move the sandbox to the target directory
- sandbox_root = sandbox.get_directory()
- with target.timed_activity("Checking out files in {}".format(directory)):
- try:
- if hardlinks:
- self._checkout_hardlinks(sandbox_root, directory)
- else:
- utils.copy_files(sandbox_root, directory)
- except OSError as e:
- raise StreamError("Failed to checkout files: {}".format(e)) from e
- except BstError as e:
- raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
- reason=e.reason) from e
+ # track()
+ #
+ # Tracks all the sources of the selected elements.
+ #
+ # Args:
+ # targets (list of str): Targets to track
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from tracking
+ # cross_junctions (bool): Whether tracking should cross junction boundaries
+ #
+ # If no error is encountered while tracking, then the project files
+ # are rewritten inline.
+ #
+ def track(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=None,
+ track_targets=False,
+ cross_junctions=False):
+
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=True,
+ track_elements=targets,
+ track_cross_junctions=cross_junctions,
+ track_selection=selection)
+
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ self.session_elements = len(self._pipeline._track_elements)
+
+ _, status = self._scheduler.run([track])
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
# pull()
#
- # Pulls elements from the pipeline
+ # Pulls artifacts from remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to pull
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to pull from, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to pull artifacts from.
#
- def pull(self, scheduler, elements):
+ def pull(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_fetch_remotes():
- raise StreamError("Not artifact caches available for pulling artifacts")
+ raise StreamError("No artifact caches available for pulling artifacts")
plan = elements
self._pipeline._assert_consistent(plan)
@@ -245,13 +316,28 @@ class Stream():
# push()
#
- # Pushes elements in the pipeline
+ # Pulls artifacts to remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to push
+ # targets (list of str): Targets to push
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to push to, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to push artifacts to.
#
- def push(self, scheduler, elements):
+ def push(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
@@ -270,19 +356,81 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
+ # checkout()
+ #
+ # Checkout the pipeline target artifact to the specified directory
+ #
+ # Args:
+ # target (str): Target to checkout
+ # directory (str): The directory to checkout the artifact to
+ # force (bool): Force overwrite files which exist in `directory`
+ # integrate (bool): Whether to run integration commands
+ # hardlinks (bool): Whether checking out files hardlinked to
+ # their artifacts is acceptable
+ #
+ def checkout(self, target, *,
+ directory=None,
+ force=False,
+ integrate=True,
+ hardlinks=False):
+
+ self.init_pipeline((target,))
+
+ # We only have one target in a checkout command
+ target = self._pipeline.targets[0]
+
+ try:
+ os.makedirs(directory, exist_ok=True)
+ except OSError as e:
+ raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+
+ if not os.access(directory, os.W_OK):
+ raise StreamError("Directory {} not writable".format(directory))
+
+ if not force and os.listdir(directory):
+ raise StreamError("Checkout directory is not empty: {}"
+ .format(directory))
+
+ # Stage deps into a temporary sandbox first
+ try:
+ with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+
+ # Copy or move the sandbox to the target directory
+ sandbox_root = sandbox.get_directory()
+ with target.timed_activity("Checking out files in {}".format(directory)):
+ try:
+ if hardlinks:
+ self._checkout_hardlinks(sandbox_root, directory)
+ else:
+ utils.copy_files(sandbox_root, directory)
+ except OSError as e:
+ raise StreamError("Failed to checkout files: {}".format(e)) from e
+ except BstError as e:
+ raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
+ reason=e.reason) from e
+
# workspace_open
#
# Open a project workspace
#
# Args:
- # target (Element): The element to open the workspace for
+ # target (str): The target element to open the workspace for
# directory (str): The directory to stage the source in
# no_checkout (bool): Whether to skip checking out the source
# track_first (bool): Whether to track and fetch first
# force (bool): Whether to ignore contents in an existing directory
#
- def workspace_open(self, target, directory, no_checkout, track_first, force):
- project = self._context.get_toplevel_project()
+ def workspace_open(self, target, directory, *,
+ no_checkout,
+ track_first,
+ force):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
+
+ target = self._pipeline.targets[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -294,15 +442,19 @@ class Stream():
raise StreamError("The given element has no sources", detail=detail)
# Check for workspace config
- workspace = project.workspaces.get_workspace(target.name)
+ workspace = self._project.workspaces.get_workspace(target.name)
if workspace:
raise StreamError("Workspace '{}' is already defined at: {}"
.format(target.name, workspace.path))
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
+ #
+ # For now, tracking is handled by _do_fetch() automatically
+ # by virtue of our screwed up pipeline initialization stuff.
+ #
if not no_checkout or track_first:
- self.fetch(self._scheduler, [target])
+ self._do_fetch([target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -315,13 +467,13 @@ class Stream():
except OSError as e:
raise StreamError("Failed to create workspace directory: {}".format(e)) from e
- project.workspaces.create_workspace(target.name, workdir)
+ self._project.workspaces.create_workspace(target.name, workdir)
if not no_checkout:
with target.timed_activity("Staging sources to {}".format(directory)):
target._open_workspace()
- project.workspaces.save_config()
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Saved workspace configuration")
# workspace_close
@@ -332,9 +484,8 @@ class Stream():
# element_name (str): The element name to close the workspace for
# remove_dir (bool): Whether to remove the associated directory
#
- def workspace_close(self, element_name, remove_dir):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(element_name)
+ def workspace_close(self, element_name, *, remove_dir):
+ workspace = self._project.workspaces.get_workspace(element_name)
# Remove workspace directory if prompted
if remove_dir:
@@ -347,8 +498,8 @@ class Stream():
.format(workspace.path, e)) from e
# Delete the workspace and save the configuration
- project.workspaces.delete_workspace(element_name)
- project.workspaces.save_config()
+ self._project.workspaces.delete_workspace(element_name)
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
# workspace_reset
@@ -357,19 +508,40 @@ class Stream():
# changes.
#
# Args:
- # target (Element): The element to reset the workspace for
- # track (bool): Whether to also track the source
+ # targets (list of str): The target elements to reset the workspace for
+ # track_first (bool): Whether to also track the sources first
#
- def workspace_reset(self, target, track):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(target.name)
+ def workspace_reset(self, targets, *, track_first):
- if workspace is None:
- raise StreamError("Workspace '{}' is currently not defined"
- .format(target.name))
+ self.init_pipeline(targets,
+ track_elements=targets if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
- self.workspace_close(target.name, True)
- self.workspace_open(target, workspace.path, False, track, False)
+ # Do the tracking first
+ if track_first:
+ self._do_fetch(self._pipeline.targets)
+
+ for target in self._pipeline.targets:
+ workspace = self._project.workspaces.get_workspace(target.name)
+
+ with target.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
+ try:
+ shutil.rmtree(workspace.path)
+ except OSError as e:
+ raise StreamError("Could not remove '{}': {}"
+ .format(workspace.path, e)) from e
+
+ self._project.workspaces.delete_workspace(target.name)
+ self._project.workspaces.create_workspace(target.name, workspace.path)
+
+ with target.timed_activity("Staging sources to {}".format(workspace.path)):
+ target._open_workspace()
+
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+
+ self._project.workspaces.save_config()
# workspace_exists
#
@@ -385,13 +557,11 @@ class Stream():
# True if there are any existing workspaces.
#
def workspace_exists(self, element_name=None):
- project = self._context.get_toplevel_project()
-
if element_name:
- workspace = project.workspaces.get_workspace(element_name)
+ workspace = self._project.workspaces.get_workspace(element_name)
if workspace:
return True
- elif any(project.workspaces.list()):
+ elif any(self._project.workspaces.list()):
return True
return False
@@ -401,9 +571,8 @@ class Stream():
# Serializes the workspaces and dumps them in YAML to stdout.
#
def workspace_list(self):
- project = self._context.get_toplevel_project()
workspaces = []
- for element_name, workspace_ in project.workspaces.list():
+ for element_name, workspace_ in self._project.workspaces.list():
workspace_detail = {
'element': element_name,
'directory': workspace_.path,
@@ -416,16 +585,28 @@ class Stream():
# source_bundle()
#
- # Create a build bundle for the given artifact.
+ # Create a host buildable tarball bundle for the given target.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- #
- def source_bundle(self, scheduler, dependencies, force,
- track_first, compression, directory):
+ # target (str): The target element to bundle
+ # directory (str): The directory to output the tarball
+ # track_first (bool): Track new source references before bundling
+ # compression (str): The compression type to use
+ # force (bool): Overwrite an existing tarball
+ #
+ def source_bundle(self, target, directory, *,
+ track_first=False,
+ force=False,
+ compression="gz"):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
# source-bundle only supports one target
target = self._pipeline.targets[0]
+ dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -444,7 +625,7 @@ class Stream():
.format(tar_location, e)) from e
plan = list(dependencies)
- self.fetch(self._scheduler, plan)
+ self._do_fetch(plan)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
@@ -484,6 +665,48 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
+ # _do_fetch()
+ #
+ # Performs the fetch job, the body of this function is here because
+ # it is shared between a few internals.
+ #
+ # Args:
+ # elements (list of Element): Elements to fetch
+ #
+ def _do_fetch(self, elements):
+
+ fetch_plan = elements
+
+ # Subtract the track elements from the fetch elements, they will be added separately
+ if self._pipeline._track_elements:
+ track_elements = set(self._pipeline._track_elements)
+ fetch_plan = [e for e in fetch_plan if e not in track_elements]
+
+ # Assert consistency for the fetch elements
+ self._pipeline._assert_consistent(fetch_plan)
+
+ # Filter out elements with cached sources, only from the fetch plan
+ # let the track plan resolve new refs.
+ cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
+ fetch_plan = [elt for elt in fetch_plan if elt not in cached]
+
+ self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+
+ fetch = FetchQueue(self._scheduler)
+ fetch.enqueue(fetch_plan)
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ queues = [track, fetch]
+ else:
+ queues = [fetch]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
# Helper function for checkout()
#
def _checkout_hardlinks(self, sandbox_root, directory):
@@ -546,3 +769,56 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
+
+ #############################################################
+ # TEMPORARY CRAP #
+ #############################################################
+
+ # init_pipeline()
+ #
+ # Initialize the pipeline for a given activity
+ #
+ # Args:
+ # elements (list of elements): The elements to load recursively
+ # except_ (list of elements): The elements to except
+ # rewritable (bool): Whether we should load the YAML files for roundtripping
+ # use_configured_remote_caches (bool): Whether we should contact remotes
+ # add_remote_cache (str): The URL for an explicitly mentioned remote cache
+ # track_elements (list of elements): Elements which are to be tracked
+ # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
+ # track_selection (PipelineSelection): The selection algorithm for track elements
+ # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
+ # loading process, if they are not yet locally cached
+ #
+ # Note that the except_ argument may have a subtly different meaning depending
+ # on the activity performed on the Pipeline. In normal circumstances the except_
+ # argument excludes elements from the `elements` list. In a build session, the
+ # except_ elements are excluded from the tracking plan.
+ #
+ def init_pipeline(self, elements, *,
+ except_=tuple(),
+ rewritable=False,
+ use_configured_remote_caches=False,
+ add_remote_cache=None,
+ track_elements=None,
+ track_cross_junctions=False,
+ track_selection=PipelineSelection.ALL):
+
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ self._pipeline = Pipeline(self._context, self._project, self._artifacts,
+ elements, except_, rewritable=rewritable)
+
+ self._pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=add_remote_cache,
+ track_elements=track_elements,
+ track_cross_junctions=track_cross_junctions,
+ track_selection=track_selection)
+
+ profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ # Get the total
+ self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
+
+ if self._loaded_cb is not None:
+ self._loaded_cb(self._pipeline)
diff --git a/tests/plugins/pipeline.py b/tests/plugins/pipeline.py
index 4c0e5c397..db683094b 100644
--- a/tests/plugins/pipeline.py
+++ b/tests/plugins/pipeline.py
@@ -23,7 +23,7 @@ def create_pipeline(tmpdir, basedir, target):
context.set_message_handler(dummy_handler)
- return Pipeline(context, project, [target], [])
+ return Pipeline(context, project, None, [target], [])
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'customsource'))