summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-30 20:11:30 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-08 03:59:38 +0900
commit541cd76022ca7e9ef8e4114495b908943fe3e8b1 (patch)
tree516861e75a2572d1b3811b3f04af9b087246c020
parentc9f67f1e42734404fb47ddea0b507f45864d1dcf (diff)
downloadbuildstream-541cd76022ca7e9ef8e4114495b908943fe3e8b1.tar.gz
_stream.py, _frontend: Call Stream() APIs with CLI arguments directly.
This shifts the whole responsibility of interpreting command line targets etc to the Stream() object itself. With this commit, the Pipeline() truly becomes slaved to the Stream().
-rw-r--r--buildstream/_frontend/app.py95
-rw-r--r--buildstream/_frontend/cli.py223
-rw-r--r--buildstream/_pipeline.py9
-rw-r--r--buildstream/_stream.py544
-rw-r--r--tests/plugins/pipeline.py2
5 files changed, 553 insertions, 320 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index ea6aed0d9..5b0cfc4b1 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,9 +39,7 @@ from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
-from .._pipeline import Pipeline, PipelineSelection
from .._scheduler import Scheduler
-from .._profile import Topics, profile_start, profile_end
from .._versions import BST_FORMAT_VERSION
from .. import __version__ as build_stream_version
from .. import _yaml
@@ -72,7 +70,6 @@ class App():
self.stream = None # The Stream object
self.project = None # The toplevel Project object
self.scheduler = None # The Scheduler
- self.pipeline = None # The Pipeline
self.logger = None # The LogLine object
self.interactive = None # Whether we are running in interactive mode
self.colors = None # Whether to use colors in logging
@@ -81,6 +78,7 @@ class App():
# Private members
#
self._session_start = datetime.datetime.now()
+ self._session_name = None
self._main_options = main_options # Main CLI options, before any command
self._status = None # The Status object
self._fail_messages = {} # Failure messages by unique plugin id
@@ -194,7 +192,7 @@ class App():
self._error_exit(e, "Error loading project")
# Create the stream right away, we'll need to pass it around
- self.stream = Stream(self.context)
+ self.stream = Stream(self.context, self.project, self.loaded_cb)
# Create the application's scheduler
self.scheduler = Scheduler(self.context, self._session_start,
@@ -242,15 +240,7 @@ class App():
# reporting the errors and exiting with a consistent error status.
#
# Args:
- # elements (list of elements): The elements to load recursively
# session_name (str): The name of the session, or None for no session
- # except_ (list of elements): The elements to except
- # rewritable (bool): Whether we should load the YAML files for roundtripping
- # use_configured_remote_caches (bool): Whether we should contact remotes
- # add_remote_cache (str): The URL for an explicitly mentioned remote cache
- # track_elements (list of elements): Elements which are to be tracked
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
# loading process, if they are not yet locally cached
#
@@ -263,51 +253,29 @@ class App():
# the session header and summary, and time the main session from startup time.
#
@contextmanager
- def initialized(self, elements, *, session_name=None,
- except_=tuple(), rewritable=False,
- use_configured_remote_caches=False,
- add_remote_cache=None,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL,
+ def initialized(self, *,
+ session_name=None,
fetch_subprojects=False):
+ self._session_name = session_name
+
# Start with the early stage init, this enables logging right away
with self.partially_initialized(fetch_subprojects=fetch_subprojects):
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
# Mark the beginning of the session
if session_name:
self._message(MessageType.START, session_name)
- try:
- self.pipeline = Pipeline(self.context, self.project, elements, except_,
- rewritable=rewritable)
- except BstError as e:
- self._error_exit(e, "Error loading pipeline")
-
- # Initialize pipeline
- try:
- self.pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=add_remote_cache,
- track_elements=track_elements,
- track_cross_junctions=track_cross_junctions,
- track_selection=track_selection)
- except BstError as e:
- self._error_exit(e, "Error initializing pipeline")
-
# XXX This is going to change soon !
#
self.stream._scheduler = self.scheduler
- self.stream._pipeline = self.pipeline
- self.stream.total_elements = len(list(self.pipeline.dependencies(Scope.ALL)))
-
- profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
- # Print the heading
- if session_name:
- self._print_heading()
+ # XXX Print the heading
+ #
+ # WE NEED A STREAM CALLBACK FOR POST LOAD SESSION START
+ #
+ # if session_name:
+ # self._print_heading()
# Run the body of the session here, once everything is loaded
try:
@@ -405,22 +373,18 @@ class App():
click.echo("Created project.conf at: {}".format(project_path), err=True)
sys.exit(0)
- # shell()
+ # shell_prompt():
#
- # Run a shell
+ # Creates a prompt for a shell environment, using ANSI color codes
+ # if they are available in the execution context.
#
# Args:
- # element (Element): An Element object to run the shell for
- # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN)
- # directory (str): A directory where an existing prestaged sysroot is expected, or None
- # mounts (list of HostMount): Additional directories to mount into the sandbox
- # isolate (bool): Whether to isolate the environment like we do in builds
- # command (list): An argv to launch in the sandbox, or None
+ # element (Element): The Element object to resolve a prompt for
#
# Returns:
- # (int): The exit code of the launched shell
+ # (str): The formatted prompt to display in the shell
#
- def shell(self, element, scope, directory, *, mounts=None, isolate=False, command=None):
+ def shell_prompt(self, element):
_, key, dim = element._get_display_key()
element_name = element._get_full_name()
@@ -435,7 +399,7 @@ class App():
else:
prompt = '[{}@{}:${{PWD}}]$ '.format(key, element_name)
- return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
+ return prompt
# cleanup()
#
@@ -444,8 +408,8 @@ class App():
# This is called by Click at exit time
#
def cleanup(self):
- if self.pipeline:
- self.pipeline.cleanup()
+ if self.stream:
+ self.stream.cleanup()
############################################################
# Local Functions #
@@ -609,7 +573,8 @@ class App():
if choice == 'shell':
click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
try:
- self.shell(element, Scope.BUILD, failure.sandbox, isolate=True)
+ prompt = self.shell_prompt(element)
+ self.stream.shell(element, Scope.BUILD, prompt, directory=failure.sandbox, isolate=True)
except BstError as e:
click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
elif choice == 'log':
@@ -632,14 +597,14 @@ class App():
queue.enqueue([element])
#
- # Prints the application startup heading, used for commands which
- # will process a pipeline.
+ # Print the session heading if we've loaded a pipeline and there
+ # is going to be a session
#
- def _print_heading(self, deps=None):
- self.logger.print_heading(self.pipeline,
- self._main_options['log_file'],
- styling=self.colors,
- deps=deps)
+ def loaded_cb(self, pipeline):
+ if self._session_name:
+ self.logger.print_heading(pipeline,
+ self._main_options['log_file'],
+ styling=self.colors)
#
# Print a summary of the queues
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 1393db35a..1ac4548b6 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -235,15 +235,12 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
if track_all:
track_ = elements
- rewritable = False
- if track_:
- rewritable = True
-
- with app.initialized(elements, session_name="Build", except_=track_except, rewritable=rewritable,
- use_configured_remote_caches=True, track_elements=track_,
+ with app.initialized(session_name="Build", fetch_subprojects=True):
+ app.stream.build(elements,
+ track_targets=track_,
+ track_except=track_except,
track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True):
- app.stream.build(app.scheduler, build_all=all_)
+ build_all=all_)
##################################################################
@@ -282,12 +279,12 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions):
click.echo("ERROR: The --track-cross-junctions option can only be used with --track", err=True)
sys.exit(-1)
- with app.initialized(elements, session_name="Fetch", except_=except_, rewritable=track_,
- track_elements=elements if track_ else None,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True):
- dependencies = app.pipeline.get_selection(deps)
- app.stream.fetch(app.scheduler, dependencies)
+ with app.initialized(session_name="Fetch", fetch_subprojects=True):
+ app.stream.fetch(elements,
+ selection=deps,
+ except_targets=except_,
+ track_targets=track_,
+ track_cross_junctions=track_cross_junctions)
##################################################################
@@ -318,12 +315,11 @@ def track(app, elements, deps, except_, cross_junctions):
none: No dependencies, just the specified elements
all: All dependencies of all specified elements
"""
- with app.initialized(elements, session_name="Track", except_=except_, rewritable=True,
- track_elements=elements,
- track_cross_junctions=cross_junctions,
- track_selection=deps,
- fetch_subprojects=True):
- app.stream.track(app.scheduler)
+ with app.initialized(session_name="Track", fetch_subprojects=True):
+ app.stream.track(elements,
+ selection=deps,
+ except_targets=except_,
+ cross_junctions=cross_junctions)
##################################################################
@@ -351,10 +347,8 @@ def pull(app, elements, deps, remote):
none: No dependencies, just the element itself
all: All dependencies
"""
- with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None),
- add_remote_cache=remote, fetch_subprojects=True):
- to_pull = app.pipeline.get_selection(deps)
- app.stream.pull(app.scheduler, to_pull)
+ with app.initialized(session_name="Pull", fetch_subprojects=True):
+ app.stream.pull(elements, selection=deps, remote=remote)
##################################################################
@@ -381,11 +375,8 @@ def push(app, elements, deps, remote):
none: No dependencies, just the element itself
all: All dependencies
"""
- with app.initialized(elements, session_name="Push",
- use_configured_remote_caches=(remote is None),
- add_remote_cache=remote, fetch_subprojects=True):
- to_push = app.pipeline.get_selection(deps)
- app.stream.push(app.scheduler, to_push)
+ with app.initialized(session_name="Push", fetch_subprojects=True):
+ app.stream.push(elements, selection=deps, remote=remote)
##################################################################
@@ -456,9 +447,12 @@ def show(app, elements, deps, except_, order, format_, downloadable):
bst show target.bst --format \\
$'---------- %{name} ----------\\n%{vars}'
"""
- with app.initialized(elements, except_=except_, use_configured_remote_caches=downloadable):
+ with app.initialized():
+ dependencies = app.stream.load_selection(elements,
+ selection=deps,
+ except_targets=except_,
+ downloadable=downloadable)
- dependencies = app.pipeline.get_selection(deps)
if order == "alpha":
dependencies = sorted(dependencies)
@@ -466,8 +460,7 @@ def show(app, elements, deps, except_, order, format_, downloadable):
format_ = app.context.log_element_format
report = app.logger.show_pipeline(dependencies, format_)
-
- click.echo(report, color=app.colors)
+ click.echo(report, color=app.colors)
##################################################################
@@ -507,43 +500,32 @@ def shell(app, element, sysroot, mount, isolate, build_, command):
"""
from ..element import Scope
from .._project import HostMount
+ from .._pipeline import PipelineSelection
+
if build_:
scope = Scope.BUILD
else:
scope = Scope.RUN
- with app.initialized((element,)):
- pass
-
- # Assert we have everything we need built.
- missing_deps = []
- if scope is not None:
- for dep in app.pipeline.dependencies(scope):
- if not dep._cached():
- missing_deps.append(dep)
-
- if missing_deps:
- click.echo("", err=True)
- click.echo("Missing elements for staging an environment for a shell:", err=True)
- for dep in missing_deps:
- click.echo(" {}".format(dep.name), err=True)
- click.echo("", err=True)
- click.echo("Try building them first", err=True)
- sys.exit(-1)
-
- mounts = [
- HostMount(path, host_path)
- for host_path, path in mount
- ]
+ with app.initialized():
+ dependencies = app.stream.load_selection((element,), selection=PipelineSelection.NONE)
+ element = dependencies[0]
+ prompt = app.shell_prompt(element)
+ mounts = [
+ HostMount(path, host_path)
+ for host_path, path in mount
+ ]
+ try:
+ exitcode = app.stream.shell(element, scope, prompt,
+ directory=sysroot,
+ mounts=mounts,
+ isolate=isolate,
+ command=command)
+ except BstError as e:
+ raise AppError("Error launching shell: {}".format(e), detail=e.detail) from e
- try:
- element = app.pipeline.targets[0]
- exitcode = app.shell(element, scope, sysroot, mounts=mounts, isolate=isolate, command=command)
- sys.exit(exitcode)
- except BstError as e:
- click.echo("", err=True)
- click.echo("Errors shelling into this pipeline: {}".format(e), err=True)
- sys.exit(-1)
+ # If there were no errors, we return the shell's exit code here.
+ sys.exit(exitcode)
##################################################################
@@ -563,37 +545,12 @@ def shell(app, element, sysroot, mount, isolate, build_, command):
def checkout(app, element, directory, force, integrate, hardlinks):
"""Checkout a built artifact to the specified directory
"""
- with app.initialized((element,)):
- app.stream.checkout(directory, force, integrate, hardlinks)
-
-
-##################################################################
-# Source Bundle Command #
-##################################################################
-@cli.command(name="source-bundle", short_help="Produce a build bundle to be manually executed")
-@click.option('--except', 'except_', multiple=True,
- type=click.Path(dir_okay=False, readable=True),
- help="Elements to except from the tarball")
-@click.option('--compression', default='gz',
- type=click.Choice(['none', 'gz', 'bz2', 'xz']),
- help="Compress the tar file using the given algorithm.")
-@click.option('--track', 'track_', default=False, is_flag=True,
- help="Track new source references before building")
-@click.option('--force', '-f', default=False, is_flag=True,
- help="Overwrite files existing in checkout directory")
-@click.option('--directory', default=os.getcwd(),
- help="The directory to write the tarball to")
-@click.argument('target',
- type=click.Path(dir_okay=False, readable=True))
-@click.pass_obj
-def source_bundle(app, target, force, directory,
- track_, compression, except_):
- """Produce a source bundle to be manually executed
- """
- with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None):
- dependencies = app.pipeline.get_selection('all')
- app.stream.source_bundle(app.scheduler, dependencies, force, track_,
- compression, directory)
+ with app.initialized():
+ app.stream.checkout(element,
+ directory=directory,
+ force=force,
+ integrate=integrate,
+ hardlinks=hardlinks)
##################################################################
@@ -632,10 +589,11 @@ def workspace_open(app, no_checkout, force, track_, element, directory):
click.echo("Checkout directory is not empty: {}".format(directory), err=True)
sys.exit(-1)
- with app.initialized((element,), rewritable=track_, track_elements=[element] if track_ else None):
- # This command supports only one target
- target = app.pipeline.targets[0]
- app.stream.workspace_open(target, directory, no_checkout, track_, force)
+ with app.initialized():
+ app.stream.workspace_open(element, directory,
+ no_checkout=no_checkout,
+ track_first=track_,
+ force=force)
##################################################################
@@ -656,7 +614,7 @@ def workspace_close(app, remove_dir, all_, elements):
click.echo('ERROR: no elements specified', err=True)
sys.exit(-1)
- with app.partially_initialized():
+ with app.initialized():
# Early exit if we specified `all` and there are no workspaces
if all_ and not app.stream.workspace_exists():
@@ -679,7 +637,7 @@ def workspace_close(app, remove_dir, all_, elements):
if all_:
elements = [element_name for element_name, _ in app.project.workspaces.list()]
for element_name in elements:
- app.stream.workspace_close(element_name, remove_dir)
+ app.stream.workspace_close(element_name, remove_dir=remove_dir)
##################################################################
@@ -696,22 +654,31 @@ def workspace_close(app, remove_dir, all_, elements):
def workspace_reset(app, track_, all_, elements):
"""Reset a workspace to its original state"""
- if not (all_ or elements):
- click.echo('ERROR: no elements specified', err=True)
- sys.exit(-1)
+ # Check that the workspaces in question exist
+ with app.initialized():
- if app.interactive:
- if not click.confirm('This will remove all your changes, are you sure?'):
- click.echo('Aborting', err=True)
- sys.exit(-1)
+ if not (all_ or elements):
+ raise AppError('No elements specified to reset')
+
+ if all_ and not app.stream.workspace_exists():
+ raise AppError("No open workspaces to reset")
+
+ nonexisting = []
+ for element_name in elements:
+ if not app.stream.workspace_exists(element_name):
+ nonexisting.append(element_name)
+ if nonexisting:
+ raise AppError("Workspace does not exist", detail="\n".join(nonexisting))
+
+ if app.interactive:
+ if not click.confirm('This will remove all your changes, are you sure?'):
+ click.echo('Aborting', err=True)
+ sys.exit(-1)
- with app.partially_initialized():
if all_:
elements = tuple(element_name for element_name, _ in app.project.workspaces.list())
- with app.initialized(elements):
- for target in app.pipeline.targets:
- app.stream.workspace_reset(target, track_)
+ app.stream.workspace_reset(elements, track_first=track_)
##################################################################
@@ -722,5 +689,35 @@ def workspace_reset(app, track_, all_, elements):
def workspace_list(app):
"""List open workspaces"""
- with app.partially_initialized():
+ with app.initialized():
app.stream.workspace_list()
+
+
+##################################################################
+# Source Bundle Command #
+##################################################################
+@cli.command(name="source-bundle", short_help="Produce a build bundle to be manually executed")
+@click.option('--except', 'except_', multiple=True,
+ type=click.Path(dir_okay=False, readable=True),
+ help="Elements to except from the tarball")
+@click.option('--compression', default='gz',
+ type=click.Choice(['none', 'gz', 'bz2', 'xz']),
+ help="Compress the tar file using the given algorithm.")
+@click.option('--track', 'track_', default=False, is_flag=True,
+ help="Track new source references before bundling")
+@click.option('--force', '-f', default=False, is_flag=True,
+ help="Overwrite an existing tarball")
+@click.option('--directory', default=os.getcwd(),
+ help="The directory to write the tarball to")
+@click.argument('element',
+ type=click.Path(dir_okay=False, readable=True))
+@click.pass_obj
+def source_bundle(app, element, force, directory,
+ track_, compression, except_):
+ """Produce a source bundle to be manually executed
+ """
+ with app.initialized(fetch_subprojects=True):
+ app.stream.source_bundle(element, directory,
+ track_first=track_,
+ force=force,
+ compression=compression)
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 52bae2e5c..7f3c657fa 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -27,7 +27,6 @@ from ._message import Message, MessageType
from ._loader import Loader
from .element import Element
from . import Scope, Consistency
-from ._platform import Platform
from ._project import ProjectRefStorage
from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs
@@ -87,7 +86,7 @@ class PipelineSelection():
#
class Pipeline():
- def __init__(self, context, project, targets, except_, rewritable=False):
+ def __init__(self, context, project, artifacts, targets, except_, rewritable=False):
self.context = context # The Context
self.project = project # The toplevel project
@@ -96,7 +95,7 @@ class Pipeline():
#
# Private members
#
- self._artifacts = None
+ self._artifacts = artifacts
self._loader = None
self._exceptions = None
self._track_cross_junctions = False
@@ -106,10 +105,6 @@ class Pipeline():
# Early initialization
#
- # Load selected platform
- Platform.create_instance(context, project)
- platform = Platform.get_platform()
- self._artifacts = platform.artifactcache
self._loader = Loader(self.context, self.project, targets + except_)
with self.context.timed_activity("Loading pipeline", silent_nested=True):
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 93a12f630..09ad51d1b 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -27,6 +27,9 @@ from tempfile import TemporaryDirectory
from ._exceptions import StreamError, ImplError, BstError
from ._message import Message, MessageType
from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._pipeline import Pipeline, PipelineSelection
+from ._platform import Platform
+from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -37,90 +40,120 @@ from . import Scope, Consistency
#
# Args:
# context (Context): The Context object
+# project (Project): The Project object
+# loaded_callback (callable): A callback to invoke when the pipeline is loaded
#
class Stream():
- def __init__(self, context):
+ def __init__(self, context, project, loaded_callback):
self.session_elements = 0 # Number of elements to process in this session
self.total_elements = 0 # Number of total potential elements for this pipeline
self._context = context
+ self._project = project
self._scheduler = None
self._pipeline = None
- # track()
+ self._loaded_cb = loaded_callback
+
+ # Load selected platform
+ Platform.create_instance(context, project)
+ self._platform = Platform.get_platform()
+ self._artifacts = self._platform.artifactcache
+
+ # cleanup()
#
- # Trackes all the sources of all the elements in the pipeline,
- # i.e. all of the elements which the target somehow depends on.
+ # Cleans up application state
#
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ def cleanup(self):
+ if self._pipeline:
+ self._pipeline.cleanup()
+
+ # load_selection()
#
- # If no error is encountered while tracking, then the project files
- # are rewritten inline.
+ # An all purpose method for loading a selection of elements, this
+ # is primarily useful for the frontend to implement `bst show`
+ # and `bst shell`.
#
- def track(self, scheduler):
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Args:
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # downloadable (bool): Whether the downloadable state of elements should be resolved
+ #
+ def load_selection(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=(),
+ downloadable=False):
+ self.init_pipeline(targets, except_=except_targets,
+ use_configured_remote_caches=downloadable)
+ return self._pipeline.get_selection(selection)
- # fetch()
+ # shell()
#
- # Fetches sources on the pipeline.
+ # Run a shell
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to fetch
+ # element (Element): An Element object to run the shell for
+ # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN)
+ # prompt (str): The prompt to display in the shell
+ # directory (str): A directory where an existing prestaged sysroot is expected, or None
+ # mounts (list of HostMount): Additional directories to mount into the sandbox
+ # isolate (bool): Whether to isolate the environment like we do in builds
+ # command (list): An argv to launch in the sandbox, or None
#
- def fetch(self, scheduler, dependencies):
- fetch_plan = dependencies
-
- # Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
-
- # Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
-
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Returns:
+ # (int): The exit code of the launched shell
+ #
+ def shell(self, element, scope, prompt, *,
+ directory=None,
+ mounts=None,
+ isolate=False,
+ command=None):
+
+ # Assert we have everything we need built, unless the directory is specified
+ # in which case we just blindly trust the directory, using the element
+ # definitions to control the execution environment only.
+ if directory is None:
+ missing_deps = [
+ dep._get_full_name()
+ for dep in self._pipeline.dependencies(scope)
+ if not dep._cached()
+ ]
+ if missing_deps:
+ raise StreamError("Elements need to be built or downloaded before staging a shell environment",
+ detail="\n".join(missing_deps))
+
+ return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command)
# build()
#
# Builds (assembles) elements in the pipeline.
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
+ # targets (list of str): Targets to build
+ # track_targets (list of str): Specified targets for tracking
+ # track_except (list of str): Specified targets to except from tracking
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
# build_all (bool): Whether to build all elements, or only those
# which are required to build the target.
#
- def build(self, scheduler, *, build_all=False):
+ def build(self, targets, *,
+ track_targets=None,
+ track_except=None,
+ track_cross_junctions=False,
+ build_all=False):
+
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ self.init_pipeline(targets,
+ except_=track_except,
+ rewritable=rewritable,
+ use_configured_remote_caches=True,
+ track_elements=track_targets,
+ track_cross_junctions=track_cross_junctions)
if build_all:
plan = self._pipeline.dependencies(Scope.ALL)
@@ -171,63 +204,101 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
- # checkout()
+ # fetch()
#
- # Checkout the pipeline target artifact to the specified directory
+ # Fetches sources on the pipeline.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- # force (bool): Force overwrite files which exist in `directory`
- # integrate (bool): Whether to run integration commands
- # hardlinks (bool): Whether checking out files hardlinked to
- # their artifacts is acceptable
+ # targets (list of str): Targets to fetch
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_targets (bool): Whether to track selected targets in addition to fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
#
- def checkout(self, directory, force, integrate, hardlinks):
- # We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ def fetch(self, targets, *,
+ selection=PipelineSelection.PLAN,
+ except_targets=None,
+ track_targets=False,
+ track_cross_junctions=False):
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+ rewritable = False
+ if track_targets:
+ rewritable = True
- if not os.access(directory, os.W_OK):
- raise StreamError("Directory {} not writable".format(directory))
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=rewritable,
+ track_elements=targets if track_targets else None,
+ track_cross_junctions=track_cross_junctions)
- if not force and os.listdir(directory):
- raise StreamError("Checkout directory is not empty: {}"
- .format(directory))
+ fetch_plan = self._pipeline.get_selection(selection)
- # Stage deps into a temporary sandbox first
- try:
- with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+ # Delegated to a shared method for now
+ self._do_fetch(fetch_plan)
- # Copy or move the sandbox to the target directory
- sandbox_root = sandbox.get_directory()
- with target.timed_activity("Checking out files in {}".format(directory)):
- try:
- if hardlinks:
- self._checkout_hardlinks(sandbox_root, directory)
- else:
- utils.copy_files(sandbox_root, directory)
- except OSError as e:
- raise StreamError("Failed to checkout files: {}".format(e)) from e
- except BstError as e:
- raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
- reason=e.reason) from e
+ # track()
+ #
+ # Tracks all the sources of the selected elements.
+ #
+ # Args:
+ # targets (list of str): Targets to track
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # except_targets (list of str): Specified targets to except from tracking
+ # cross_junctions (bool): Whether tracking should cross junction boundaries
+ #
+ # If no error is encountered while tracking, then the project files
+ # are rewritten inline.
+ #
+ def track(self, targets, *,
+ selection=PipelineSelection.NONE,
+ except_targets=None,
+ track_targets=False,
+ cross_junctions=False):
+
+ self.init_pipeline(targets,
+ except_=except_targets,
+ rewritable=True,
+ track_elements=targets,
+ track_cross_junctions=cross_junctions,
+ track_selection=selection)
+
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ self.session_elements = len(self._pipeline._track_elements)
+
+ _, status = self._scheduler.run([track])
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
# pull()
#
- # Pulls elements from the pipeline
+ # Pulls artifacts from remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to pull
+ # targets (list of str): Targets to pull
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to pull from, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to pull artifacts from.
#
- def pull(self, scheduler, elements):
+ def pull(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_fetch_remotes():
- raise StreamError("Not artifact caches available for pulling artifacts")
+ raise StreamError("No artifact caches available for pulling artifacts")
plan = elements
self._pipeline._assert_consistent(plan)
@@ -245,13 +316,28 @@ class Stream():
# push()
#
- # Pushes elements in the pipeline
+ # Pulls artifacts to remote artifact server(s)
#
# Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to push
+ # targets (list of str): Targets to push
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # remote (str): The URL of a specific remote server to push to, or None
+ #
+ # If `remote` specified as None, then regular configuration will be used
+ # to determine where to push artifacts to.
#
- def push(self, scheduler, elements):
+ def push(self, targets, *,
+ selection=PipelineSelection.NONE,
+ remote=None):
+
+ use_configured_remote_caches = True
+ if remote is not None:
+ use_configured_remote_caches = False
+
+ self.init_pipeline(targets,
+ use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=remote)
+ elements = self._pipeline.get_selection(selection)
if not self._pipeline._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
@@ -270,19 +356,81 @@ class Stream():
elif status == SchedStatus.TERMINATED:
raise StreamError(terminated=True)
+ # checkout()
+ #
+ # Checkout the pipeline target artifact to the specified directory
+ #
+ # Args:
+ # target (str): Target to checkout
+ # directory (str): The directory to checkout the artifact to
+ # force (bool): Force overwrite files which exist in `directory`
+ # integrate (bool): Whether to run integration commands
+ # hardlinks (bool): Whether checking out files hardlinked to
+ # their artifacts is acceptable
+ #
+ def checkout(self, target, *,
+ directory=None,
+ force=False,
+ integrate=True,
+ hardlinks=False):
+
+ self.init_pipeline((target,))
+
+ # We only have one target in a checkout command
+ target = self._pipeline.targets[0]
+
+ try:
+ os.makedirs(directory, exist_ok=True)
+ except OSError as e:
+ raise StreamError("Failed to create checkout directory: {}".format(e)) from e
+
+ if not os.access(directory, os.W_OK):
+ raise StreamError("Directory {} not writable".format(directory))
+
+ if not force and os.listdir(directory):
+ raise StreamError("Checkout directory is not empty: {}"
+ .format(directory))
+
+ # Stage deps into a temporary sandbox first
+ try:
+ with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+
+ # Copy or move the sandbox to the target directory
+ sandbox_root = sandbox.get_directory()
+ with target.timed_activity("Checking out files in {}".format(directory)):
+ try:
+ if hardlinks:
+ self._checkout_hardlinks(sandbox_root, directory)
+ else:
+ utils.copy_files(sandbox_root, directory)
+ except OSError as e:
+ raise StreamError("Failed to checkout files: {}".format(e)) from e
+ except BstError as e:
+ raise StreamError("Error while staging dependencies into a sandbox: {}".format(e),
+ reason=e.reason) from e
+
# workspace_open
#
# Open a project workspace
#
# Args:
- # target (Element): The element to open the workspace for
+ # target (str): The target element to open the workspace for
# directory (str): The directory to stage the source in
# no_checkout (bool): Whether to skip checking out the source
# track_first (bool): Whether to track and fetch first
# force (bool): Whether to ignore contents in an existing directory
#
- def workspace_open(self, target, directory, no_checkout, track_first, force):
- project = self._context.get_toplevel_project()
+ def workspace_open(self, target, directory, *,
+ no_checkout,
+ track_first,
+ force):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
+
+ target = self._pipeline.targets[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -294,15 +442,19 @@ class Stream():
raise StreamError("The given element has no sources", detail=detail)
# Check for workspace config
- workspace = project.workspaces.get_workspace(target.name)
+ workspace = self._project.workspaces.get_workspace(target.name)
if workspace:
raise StreamError("Workspace '{}' is already defined at: {}"
.format(target.name, workspace.path))
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
+ #
+ # For now, tracking is handled by _do_fetch() automatically
+ # by virtue of our screwed up pipeline initialization stuff.
+ #
if not no_checkout or track_first:
- self.fetch(self._scheduler, [target])
+ self._do_fetch([target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -315,13 +467,13 @@ class Stream():
except OSError as e:
raise StreamError("Failed to create workspace directory: {}".format(e)) from e
- project.workspaces.create_workspace(target.name, workdir)
+ self._project.workspaces.create_workspace(target.name, workdir)
if not no_checkout:
with target.timed_activity("Staging sources to {}".format(directory)):
target._open_workspace()
- project.workspaces.save_config()
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Saved workspace configuration")
# workspace_close
@@ -332,9 +484,8 @@ class Stream():
# element_name (str): The element name to close the workspace for
# remove_dir (bool): Whether to remove the associated directory
#
- def workspace_close(self, element_name, remove_dir):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(element_name)
+ def workspace_close(self, element_name, *, remove_dir):
+ workspace = self._project.workspaces.get_workspace(element_name)
# Remove workspace directory if prompted
if remove_dir:
@@ -347,8 +498,8 @@ class Stream():
.format(workspace.path, e)) from e
# Delete the workspace and save the configuration
- project.workspaces.delete_workspace(element_name)
- project.workspaces.save_config()
+ self._project.workspaces.delete_workspace(element_name)
+ self._project.workspaces.save_config()
self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
# workspace_reset
@@ -357,19 +508,40 @@ class Stream():
# changes.
#
# Args:
- # target (Element): The element to reset the workspace for
- # track (bool): Whether to also track the source
+ # targets (list of str): The target elements to reset the workspace for
+ # track_first (bool): Whether to also track the sources first
#
- def workspace_reset(self, target, track):
- project = self._context.get_toplevel_project()
- workspace = project.workspaces.get_workspace(target.name)
+ def workspace_reset(self, targets, *, track_first):
- if workspace is None:
- raise StreamError("Workspace '{}' is currently not defined"
- .format(target.name))
+ self.init_pipeline(targets,
+ track_elements=targets if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
- self.workspace_close(target.name, True)
- self.workspace_open(target, workspace.path, False, track, False)
+ # Do the tracking first
+ if track_first:
+ self._do_fetch(self._pipeline.targets)
+
+ for target in self._pipeline.targets:
+ workspace = self._project.workspaces.get_workspace(target.name)
+
+ with target.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
+ try:
+ shutil.rmtree(workspace.path)
+ except OSError as e:
+ raise StreamError("Could not remove '{}': {}"
+ .format(workspace.path, e)) from e
+
+ self._project.workspaces.delete_workspace(target.name)
+ self._project.workspaces.create_workspace(target.name, workspace.path)
+
+ with target.timed_activity("Staging sources to {}".format(workspace.path)):
+ target._open_workspace()
+
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+
+ self._project.workspaces.save_config()
# workspace_exists
#
@@ -385,13 +557,11 @@ class Stream():
# True if there are any existing workspaces.
#
def workspace_exists(self, element_name=None):
- project = self._context.get_toplevel_project()
-
if element_name:
- workspace = project.workspaces.get_workspace(element_name)
+ workspace = self._project.workspaces.get_workspace(element_name)
if workspace:
return True
- elif any(project.workspaces.list()):
+ elif any(self._project.workspaces.list()):
return True
return False
@@ -401,9 +571,8 @@ class Stream():
# Serializes the workspaces and dumps them in YAML to stdout.
#
def workspace_list(self):
- project = self._context.get_toplevel_project()
workspaces = []
- for element_name, workspace_ in project.workspaces.list():
+ for element_name, workspace_ in self._project.workspaces.list():
workspace_detail = {
'element': element_name,
'directory': workspace_.path,
@@ -416,16 +585,28 @@ class Stream():
# source_bundle()
#
- # Create a build bundle for the given artifact.
+ # Create a host buildable tarball bundle for the given target.
#
# Args:
- # directory (str): The directory to checkout the artifact to
- #
- def source_bundle(self, scheduler, dependencies, force,
- track_first, compression, directory):
+ # target (str): The target element to bundle
+ # directory (str): The directory to output the tarball
+ # track_first (bool): Track new source references before bundling
+ # compression (str): The compression type to use
+ # force (bool): Overwrite an existing tarball
+ #
+ def source_bundle(self, target, directory, *,
+ track_first=False,
+ force=False,
+ compression="gz"):
+
+ self.init_pipeline((target,),
+ track_elements=[target] if track_first else None,
+ track_selection=PipelineSelection.NONE,
+ rewritable=track_first)
# source-bundle only supports one target
target = self._pipeline.targets[0]
+ dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -444,7 +625,7 @@ class Stream():
.format(tar_location, e)) from e
plan = list(dependencies)
- self.fetch(self._scheduler, plan)
+ self._do_fetch(plan)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
@@ -484,6 +665,48 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
+ # _do_fetch()
+ #
+ # Performs the fetch job, the body of this function is here because
+ # it is shared between a few internals.
+ #
+ # Args:
+ # elements (list of Element): Elements to fetch
+ #
+ def _do_fetch(self, elements):
+
+ fetch_plan = elements
+
+ # Subtract the track elements from the fetch elements, they will be added separately
+ if self._pipeline._track_elements:
+ track_elements = set(self._pipeline._track_elements)
+ fetch_plan = [e for e in fetch_plan if e not in track_elements]
+
+ # Assert consistency for the fetch elements
+ self._pipeline._assert_consistent(fetch_plan)
+
+ # Filter out elements with cached sources, only from the fetch plan
+ # let the track plan resolve new refs.
+ cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
+ fetch_plan = [elt for elt in fetch_plan if elt not in cached]
+
+ self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+
+ fetch = FetchQueue(self._scheduler)
+ fetch.enqueue(fetch_plan)
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ queues = [track, fetch]
+ else:
+ queues = [fetch]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
# Helper function for checkout()
#
def _checkout_hardlinks(self, sandbox_root, directory):
@@ -546,3 +769,56 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
+
+ #############################################################
+ # TEMPORARY CRAP #
+ #############################################################
+
+ # init_pipeline()
+ #
+ # Initialize the pipeline for a given activity
+ #
+ # Args:
+ # elements (list of elements): The elements to load recursively
+ # except_ (list of elements): The elements to except
+ # rewritable (bool): Whether we should load the YAML files for roundtripping
+ # use_configured_remote_caches (bool): Whether we should contact remotes
+ # add_remote_cache (str): The URL for an explicitly mentioned remote cache
+ # track_elements (list of elements): Elements which are to be tracked
+ # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
+ # track_selection (PipelineSelection): The selection algorithm for track elements
+ # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
+ # loading process, if they are not yet locally cached
+ #
+ # Note that the except_ argument may have a subtly different meaning depending
+ # on the activity performed on the Pipeline. In normal circumstances the except_
+ # argument excludes elements from the `elements` list. In a build session, the
+ # except_ elements are excluded from the tracking plan.
+ #
+ def init_pipeline(self, elements, *,
+ except_=tuple(),
+ rewritable=False,
+ use_configured_remote_caches=False,
+ add_remote_cache=None,
+ track_elements=None,
+ track_cross_junctions=False,
+ track_selection=PipelineSelection.ALL):
+
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ self._pipeline = Pipeline(self._context, self._project, self._artifacts,
+ elements, except_, rewritable=rewritable)
+
+ self._pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches,
+ add_remote_cache=add_remote_cache,
+ track_elements=track_elements,
+ track_cross_junctions=track_cross_junctions,
+ track_selection=track_selection)
+
+ profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
+ # Get the total
+ self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
+
+ if self._loaded_cb is not None:
+ self._loaded_cb(self._pipeline)
diff --git a/tests/plugins/pipeline.py b/tests/plugins/pipeline.py
index 4c0e5c397..db683094b 100644
--- a/tests/plugins/pipeline.py
+++ b/tests/plugins/pipeline.py
@@ -23,7 +23,7 @@ def create_pipeline(tmpdir, basedir, target):
context.set_message_handler(dummy_handler)
- return Pipeline(context, project, [target], [])
+ return Pipeline(context, project, None, [target], [])
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'customsource'))