diff options
-rw-r--r-- | buildstream/_frontend/app.py | 95 | ||||
-rw-r--r-- | buildstream/_frontend/cli.py | 223 | ||||
-rw-r--r-- | buildstream/_pipeline.py | 9 | ||||
-rw-r--r-- | buildstream/_stream.py | 544 | ||||
-rw-r--r-- | tests/plugins/pipeline.py | 2 |
5 files changed, 553 insertions, 320 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index ea6aed0d9..5b0cfc4b1 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -39,9 +39,7 @@ from .._project import Project from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError from .._message import Message, MessageType, unconditional_messages from .._stream import Stream -from .._pipeline import Pipeline, PipelineSelection from .._scheduler import Scheduler -from .._profile import Topics, profile_start, profile_end from .._versions import BST_FORMAT_VERSION from .. import __version__ as build_stream_version from .. import _yaml @@ -72,7 +70,6 @@ class App(): self.stream = None # The Stream object self.project = None # The toplevel Project object self.scheduler = None # The Scheduler - self.pipeline = None # The Pipeline self.logger = None # The LogLine object self.interactive = None # Whether we are running in interactive mode self.colors = None # Whether to use colors in logging @@ -81,6 +78,7 @@ class App(): # Private members # self._session_start = datetime.datetime.now() + self._session_name = None self._main_options = main_options # Main CLI options, before any command self._status = None # The Status object self._fail_messages = {} # Failure messages by unique plugin id @@ -194,7 +192,7 @@ class App(): self._error_exit(e, "Error loading project") # Create the stream right away, we'll need to pass it around - self.stream = Stream(self.context) + self.stream = Stream(self.context, self.project, self.loaded_cb) # Create the application's scheduler self.scheduler = Scheduler(self.context, self._session_start, @@ -242,15 +240,7 @@ class App(): # reporting the errors and exiting with a consistent error status. # # Args: - # elements (list of elements): The elements to load recursively # session_name (str): The name of the session, or None for no session - # except_ (list of elements): The elements to except - # rewritable (bool): Whether we should load the YAML files for roundtripping - # use_configured_remote_caches (bool): Whether we should contact remotes - # add_remote_cache (str): The URL for an explicitly mentioned remote cache - # track_elements (list of elements): Elements which are to be tracked - # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries - # track_selection (PipelineSelection): The selection algorithm for track elements # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the # loading process, if they are not yet locally cached # @@ -263,51 +253,29 @@ class App(): # the session header and summary, and time the main session from startup time. # @contextmanager - def initialized(self, elements, *, session_name=None, - except_=tuple(), rewritable=False, - use_configured_remote_caches=False, - add_remote_cache=None, - track_elements=None, - track_cross_junctions=False, - track_selection=PipelineSelection.ALL, + def initialized(self, *, + session_name=None, fetch_subprojects=False): + self._session_name = session_name + # Start with the early stage init, this enables logging right away with self.partially_initialized(fetch_subprojects=fetch_subprojects): - profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) - # Mark the beginning of the session if session_name: self._message(MessageType.START, session_name) - try: - self.pipeline = Pipeline(self.context, self.project, elements, except_, - rewritable=rewritable) - except BstError as e: - self._error_exit(e, "Error loading pipeline") - - # Initialize pipeline - try: - self.pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches, - add_remote_cache=add_remote_cache, - track_elements=track_elements, - track_cross_junctions=track_cross_junctions, - track_selection=track_selection) - except BstError as e: - self._error_exit(e, "Error initializing pipeline") - # XXX This is going to change soon ! # self.stream._scheduler = self.scheduler - self.stream._pipeline = self.pipeline - self.stream.total_elements = len(list(self.pipeline.dependencies(Scope.ALL))) - - profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) - # Print the heading - if session_name: - self._print_heading() + # XXX Print the heading + # + # WE NEED A STREAM CALLBACK FOR POST LOAD SESSION START + # + # if session_name: + # self._print_heading() # Run the body of the session here, once everything is loaded try: @@ -405,22 +373,18 @@ class App(): click.echo("Created project.conf at: {}".format(project_path), err=True) sys.exit(0) - # shell() + # shell_prompt(): # - # Run a shell + # Creates a prompt for a shell environment, using ANSI color codes + # if they are available in the execution context. # # Args: - # element (Element): An Element object to run the shell for - # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN) - # directory (str): A directory where an existing prestaged sysroot is expected, or None - # mounts (list of HostMount): Additional directories to mount into the sandbox - # isolate (bool): Whether to isolate the environment like we do in builds - # command (list): An argv to launch in the sandbox, or None + # element (Element): The Element object to resolve a prompt for # # Returns: - # (int): The exit code of the launched shell + # (str): The formatted prompt to display in the shell # - def shell(self, element, scope, directory, *, mounts=None, isolate=False, command=None): + def shell_prompt(self, element): _, key, dim = element._get_display_key() element_name = element._get_full_name() @@ -435,7 +399,7 @@ class App(): else: prompt = '[{}@{}:${{PWD}}]$ '.format(key, element_name) - return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command) + return prompt # cleanup() # @@ -444,8 +408,8 @@ class App(): # This is called by Click at exit time # def cleanup(self): - if self.pipeline: - self.pipeline.cleanup() + if self.stream: + self.stream.cleanup() ############################################################ # Local Functions # @@ -609,7 +573,8 @@ class App(): if choice == 'shell': click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True) try: - self.shell(element, Scope.BUILD, failure.sandbox, isolate=True) + prompt = self.shell_prompt(element) + self.stream.shell(element, Scope.BUILD, prompt, directory=failure.sandbox, isolate=True) except BstError as e: click.echo("Error while attempting to create interactive shell: {}".format(e), err=True) elif choice == 'log': @@ -632,14 +597,14 @@ class App(): queue.enqueue([element]) # - # Prints the application startup heading, used for commands which - # will process a pipeline. + # Print the session heading if we've loaded a pipeline and there + # is going to be a session # - def _print_heading(self, deps=None): - self.logger.print_heading(self.pipeline, - self._main_options['log_file'], - styling=self.colors, - deps=deps) + def loaded_cb(self, pipeline): + if self._session_name: + self.logger.print_heading(pipeline, + self._main_options['log_file'], + styling=self.colors) # # Print a summary of the queues diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index 1393db35a..1ac4548b6 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -235,15 +235,12 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac if track_all: track_ = elements - rewritable = False - if track_: - rewritable = True - - with app.initialized(elements, session_name="Build", except_=track_except, rewritable=rewritable, - use_configured_remote_caches=True, track_elements=track_, + with app.initialized(session_name="Build", fetch_subprojects=True): + app.stream.build(elements, + track_targets=track_, + track_except=track_except, track_cross_junctions=track_cross_junctions, - fetch_subprojects=True): - app.stream.build(app.scheduler, build_all=all_) + build_all=all_) ################################################################## @@ -282,12 +279,12 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions): click.echo("ERROR: The --track-cross-junctions option can only be used with --track", err=True) sys.exit(-1) - with app.initialized(elements, session_name="Fetch", except_=except_, rewritable=track_, - track_elements=elements if track_ else None, - track_cross_junctions=track_cross_junctions, - fetch_subprojects=True): - dependencies = app.pipeline.get_selection(deps) - app.stream.fetch(app.scheduler, dependencies) + with app.initialized(session_name="Fetch", fetch_subprojects=True): + app.stream.fetch(elements, + selection=deps, + except_targets=except_, + track_targets=track_, + track_cross_junctions=track_cross_junctions) ################################################################## @@ -318,12 +315,11 @@ def track(app, elements, deps, except_, cross_junctions): none: No dependencies, just the specified elements all: All dependencies of all specified elements """ - with app.initialized(elements, session_name="Track", except_=except_, rewritable=True, - track_elements=elements, - track_cross_junctions=cross_junctions, - track_selection=deps, - fetch_subprojects=True): - app.stream.track(app.scheduler) + with app.initialized(session_name="Track", fetch_subprojects=True): + app.stream.track(elements, + selection=deps, + except_targets=except_, + cross_junctions=cross_junctions) ################################################################## @@ -351,10 +347,8 @@ def pull(app, elements, deps, remote): none: No dependencies, just the element itself all: All dependencies """ - with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None), - add_remote_cache=remote, fetch_subprojects=True): - to_pull = app.pipeline.get_selection(deps) - app.stream.pull(app.scheduler, to_pull) + with app.initialized(session_name="Pull", fetch_subprojects=True): + app.stream.pull(elements, selection=deps, remote=remote) ################################################################## @@ -381,11 +375,8 @@ def push(app, elements, deps, remote): none: No dependencies, just the element itself all: All dependencies """ - with app.initialized(elements, session_name="Push", - use_configured_remote_caches=(remote is None), - add_remote_cache=remote, fetch_subprojects=True): - to_push = app.pipeline.get_selection(deps) - app.stream.push(app.scheduler, to_push) + with app.initialized(session_name="Push", fetch_subprojects=True): + app.stream.push(elements, selection=deps, remote=remote) ################################################################## @@ -456,9 +447,12 @@ def show(app, elements, deps, except_, order, format_, downloadable): bst show target.bst --format \\ $'---------- %{name} ----------\\n%{vars}' """ - with app.initialized(elements, except_=except_, use_configured_remote_caches=downloadable): + with app.initialized(): + dependencies = app.stream.load_selection(elements, + selection=deps, + except_targets=except_, + downloadable=downloadable) - dependencies = app.pipeline.get_selection(deps) if order == "alpha": dependencies = sorted(dependencies) @@ -466,8 +460,7 @@ def show(app, elements, deps, except_, order, format_, downloadable): format_ = app.context.log_element_format report = app.logger.show_pipeline(dependencies, format_) - - click.echo(report, color=app.colors) + click.echo(report, color=app.colors) ################################################################## @@ -507,43 +500,32 @@ def shell(app, element, sysroot, mount, isolate, build_, command): """ from ..element import Scope from .._project import HostMount + from .._pipeline import PipelineSelection + if build_: scope = Scope.BUILD else: scope = Scope.RUN - with app.initialized((element,)): - pass - - # Assert we have everything we need built. - missing_deps = [] - if scope is not None: - for dep in app.pipeline.dependencies(scope): - if not dep._cached(): - missing_deps.append(dep) - - if missing_deps: - click.echo("", err=True) - click.echo("Missing elements for staging an environment for a shell:", err=True) - for dep in missing_deps: - click.echo(" {}".format(dep.name), err=True) - click.echo("", err=True) - click.echo("Try building them first", err=True) - sys.exit(-1) - - mounts = [ - HostMount(path, host_path) - for host_path, path in mount - ] + with app.initialized(): + dependencies = app.stream.load_selection((element,), selection=PipelineSelection.NONE) + element = dependencies[0] + prompt = app.shell_prompt(element) + mounts = [ + HostMount(path, host_path) + for host_path, path in mount + ] + try: + exitcode = app.stream.shell(element, scope, prompt, + directory=sysroot, + mounts=mounts, + isolate=isolate, + command=command) + except BstError as e: + raise AppError("Error launching shell: {}".format(e), detail=e.detail) from e - try: - element = app.pipeline.targets[0] - exitcode = app.shell(element, scope, sysroot, mounts=mounts, isolate=isolate, command=command) - sys.exit(exitcode) - except BstError as e: - click.echo("", err=True) - click.echo("Errors shelling into this pipeline: {}".format(e), err=True) - sys.exit(-1) + # If there were no errors, we return the shell's exit code here. + sys.exit(exitcode) ################################################################## @@ -563,37 +545,12 @@ def shell(app, element, sysroot, mount, isolate, build_, command): def checkout(app, element, directory, force, integrate, hardlinks): """Checkout a built artifact to the specified directory """ - with app.initialized((element,)): - app.stream.checkout(directory, force, integrate, hardlinks) - - -################################################################## -# Source Bundle Command # -################################################################## -@cli.command(name="source-bundle", short_help="Produce a build bundle to be manually executed") -@click.option('--except', 'except_', multiple=True, - type=click.Path(dir_okay=False, readable=True), - help="Elements to except from the tarball") -@click.option('--compression', default='gz', - type=click.Choice(['none', 'gz', 'bz2', 'xz']), - help="Compress the tar file using the given algorithm.") -@click.option('--track', 'track_', default=False, is_flag=True, - help="Track new source references before building") -@click.option('--force', '-f', default=False, is_flag=True, - help="Overwrite files existing in checkout directory") -@click.option('--directory', default=os.getcwd(), - help="The directory to write the tarball to") -@click.argument('target', - type=click.Path(dir_okay=False, readable=True)) -@click.pass_obj -def source_bundle(app, target, force, directory, - track_, compression, except_): - """Produce a source bundle to be manually executed - """ - with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None): - dependencies = app.pipeline.get_selection('all') - app.stream.source_bundle(app.scheduler, dependencies, force, track_, - compression, directory) + with app.initialized(): + app.stream.checkout(element, + directory=directory, + force=force, + integrate=integrate, + hardlinks=hardlinks) ################################################################## @@ -632,10 +589,11 @@ def workspace_open(app, no_checkout, force, track_, element, directory): click.echo("Checkout directory is not empty: {}".format(directory), err=True) sys.exit(-1) - with app.initialized((element,), rewritable=track_, track_elements=[element] if track_ else None): - # This command supports only one target - target = app.pipeline.targets[0] - app.stream.workspace_open(target, directory, no_checkout, track_, force) + with app.initialized(): + app.stream.workspace_open(element, directory, + no_checkout=no_checkout, + track_first=track_, + force=force) ################################################################## @@ -656,7 +614,7 @@ def workspace_close(app, remove_dir, all_, elements): click.echo('ERROR: no elements specified', err=True) sys.exit(-1) - with app.partially_initialized(): + with app.initialized(): # Early exit if we specified `all` and there are no workspaces if all_ and not app.stream.workspace_exists(): @@ -679,7 +637,7 @@ def workspace_close(app, remove_dir, all_, elements): if all_: elements = [element_name for element_name, _ in app.project.workspaces.list()] for element_name in elements: - app.stream.workspace_close(element_name, remove_dir) + app.stream.workspace_close(element_name, remove_dir=remove_dir) ################################################################## @@ -696,22 +654,31 @@ def workspace_close(app, remove_dir, all_, elements): def workspace_reset(app, track_, all_, elements): """Reset a workspace to its original state""" - if not (all_ or elements): - click.echo('ERROR: no elements specified', err=True) - sys.exit(-1) + # Check that the workspaces in question exist + with app.initialized(): - if app.interactive: - if not click.confirm('This will remove all your changes, are you sure?'): - click.echo('Aborting', err=True) - sys.exit(-1) + if not (all_ or elements): + raise AppError('No elements specified to reset') + + if all_ and not app.stream.workspace_exists(): + raise AppError("No open workspaces to reset") + + nonexisting = [] + for element_name in elements: + if not app.stream.workspace_exists(element_name): + nonexisting.append(element_name) + if nonexisting: + raise AppError("Workspace does not exist", detail="\n".join(nonexisting)) + + if app.interactive: + if not click.confirm('This will remove all your changes, are you sure?'): + click.echo('Aborting', err=True) + sys.exit(-1) - with app.partially_initialized(): if all_: elements = tuple(element_name for element_name, _ in app.project.workspaces.list()) - with app.initialized(elements): - for target in app.pipeline.targets: - app.stream.workspace_reset(target, track_) + app.stream.workspace_reset(elements, track_first=track_) ################################################################## @@ -722,5 +689,35 @@ def workspace_reset(app, track_, all_, elements): def workspace_list(app): """List open workspaces""" - with app.partially_initialized(): + with app.initialized(): app.stream.workspace_list() + + +################################################################## +# Source Bundle Command # +################################################################## +@cli.command(name="source-bundle", short_help="Produce a build bundle to be manually executed") +@click.option('--except', 'except_', multiple=True, + type=click.Path(dir_okay=False, readable=True), + help="Elements to except from the tarball") +@click.option('--compression', default='gz', + type=click.Choice(['none', 'gz', 'bz2', 'xz']), + help="Compress the tar file using the given algorithm.") +@click.option('--track', 'track_', default=False, is_flag=True, + help="Track new source references before bundling") +@click.option('--force', '-f', default=False, is_flag=True, + help="Overwrite an existing tarball") +@click.option('--directory', default=os.getcwd(), + help="The directory to write the tarball to") +@click.argument('element', + type=click.Path(dir_okay=False, readable=True)) +@click.pass_obj +def source_bundle(app, element, force, directory, + track_, compression, except_): + """Produce a source bundle to be manually executed + """ + with app.initialized(fetch_subprojects=True): + app.stream.source_bundle(element, directory, + track_first=track_, + force=force, + compression=compression) diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 52bae2e5c..7f3c657fa 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -27,7 +27,6 @@ from ._message import Message, MessageType from ._loader import Loader from .element import Element from . import Scope, Consistency -from ._platform import Platform from ._project import ProjectRefStorage from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs @@ -87,7 +86,7 @@ class PipelineSelection(): # class Pipeline(): - def __init__(self, context, project, targets, except_, rewritable=False): + def __init__(self, context, project, artifacts, targets, except_, rewritable=False): self.context = context # The Context self.project = project # The toplevel project @@ -96,7 +95,7 @@ class Pipeline(): # # Private members # - self._artifacts = None + self._artifacts = artifacts self._loader = None self._exceptions = None self._track_cross_junctions = False @@ -106,10 +105,6 @@ class Pipeline(): # Early initialization # - # Load selected platform - Platform.create_instance(context, project) - platform = Platform.get_platform() - self._artifacts = platform.artifactcache self._loader = Loader(self.context, self.project, targets + except_) with self.context.timed_activity("Loading pipeline", silent_nested=True): diff --git a/buildstream/_stream.py b/buildstream/_stream.py index 93a12f630..09ad51d1b 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -27,6 +27,9 @@ from tempfile import TemporaryDirectory from ._exceptions import StreamError, ImplError, BstError from ._message import Message, MessageType from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue +from ._pipeline import Pipeline, PipelineSelection +from ._platform import Platform +from ._profile import Topics, profile_start, profile_end from . import utils, _yaml, _site from . import Scope, Consistency @@ -37,90 +40,120 @@ from . import Scope, Consistency # # Args: # context (Context): The Context object +# project (Project): The Project object +# loaded_callback (callable): A callback to invoke when the pipeline is loaded # class Stream(): - def __init__(self, context): + def __init__(self, context, project, loaded_callback): self.session_elements = 0 # Number of elements to process in this session self.total_elements = 0 # Number of total potential elements for this pipeline self._context = context + self._project = project self._scheduler = None self._pipeline = None - # track() + self._loaded_cb = loaded_callback + + # Load selected platform + Platform.create_instance(context, project) + self._platform = Platform.get_platform() + self._artifacts = self._platform.artifactcache + + # cleanup() # - # Trackes all the sources of all the elements in the pipeline, - # i.e. all of the elements which the target somehow depends on. + # Cleans up application state # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on + def cleanup(self): + if self._pipeline: + self._pipeline.cleanup() + + # load_selection() # - # If no error is encountered while tracking, then the project files - # are rewritten inline. + # An all purpose method for loading a selection of elements, this + # is primarily useful for the frontend to implement `bst show` + # and `bst shell`. # - def track(self, scheduler): - track = TrackQueue(self._scheduler) - track.enqueue(self._pipeline._track_elements) - self.session_elements = len(self._pipeline._track_elements) - - _, status = self._scheduler.run([track]) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + # Args: + # targets (list of str): Targets to pull + # selection (PipelineSelection): The selection mode for the specified targets + # except_targets (list of str): Specified targets to except from fetching + # downloadable (bool): Whether the downloadable state of elements should be resolved + # + def load_selection(self, targets, *, + selection=PipelineSelection.NONE, + except_targets=(), + downloadable=False): + self.init_pipeline(targets, except_=except_targets, + use_configured_remote_caches=downloadable) + return self._pipeline.get_selection(selection) - # fetch() + # shell() # - # Fetches sources on the pipeline. + # Run a shell # # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # dependencies (list): List of elements to fetch + # element (Element): An Element object to run the shell for + # scope (Scope): The scope for the shell (Scope.BUILD or Scope.RUN) + # prompt (str): The prompt to display in the shell + # directory (str): A directory where an existing prestaged sysroot is expected, or None + # mounts (list of HostMount): Additional directories to mount into the sandbox + # isolate (bool): Whether to isolate the environment like we do in builds + # command (list): An argv to launch in the sandbox, or None # - def fetch(self, scheduler, dependencies): - fetch_plan = dependencies - - # Subtract the track elements from the fetch elements, they will be added separately - if self._pipeline._track_elements: - track_elements = set(self._pipeline._track_elements) - fetch_plan = [e for e in fetch_plan if e not in track_elements] - - # Assert consistency for the fetch elements - self._pipeline._assert_consistent(fetch_plan) - - # Filter out elements with cached sources, only from the fetch plan - # let the track plan resolve new refs. - cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] - fetch_plan = [elt for elt in fetch_plan if elt not in cached] - - self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan) - - fetch = FetchQueue(self._scheduler) - fetch.enqueue(fetch_plan) - if self._pipeline._track_elements: - track = TrackQueue(self._scheduler) - track.enqueue(self._pipeline._track_elements) - queues = [track, fetch] - else: - queues = [fetch] - - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + # Returns: + # (int): The exit code of the launched shell + # + def shell(self, element, scope, prompt, *, + directory=None, + mounts=None, + isolate=False, + command=None): + + # Assert we have everything we need built, unless the directory is specified + # in which case we just blindly trust the directory, using the element + # definitions to control the execution environment only. + if directory is None: + missing_deps = [ + dep._get_full_name() + for dep in self._pipeline.dependencies(scope) + if not dep._cached() + ] + if missing_deps: + raise StreamError("Elements need to be built or downloaded before staging a shell environment", + detail="\n".join(missing_deps)) + + return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command) # build() # # Builds (assembles) elements in the pipeline. # # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on + # targets (list of str): Targets to build + # track_targets (list of str): Specified targets for tracking + # track_except (list of str): Specified targets to except from tracking + # track_cross_junctions (bool): Whether tracking should cross junction boundaries # build_all (bool): Whether to build all elements, or only those # which are required to build the target. # - def build(self, scheduler, *, build_all=False): + def build(self, targets, *, + track_targets=None, + track_except=None, + track_cross_junctions=False, + build_all=False): + + rewritable = False + if track_targets: + rewritable = True + + self.init_pipeline(targets, + except_=track_except, + rewritable=rewritable, + use_configured_remote_caches=True, + track_elements=track_targets, + track_cross_junctions=track_cross_junctions) if build_all: plan = self._pipeline.dependencies(Scope.ALL) @@ -171,63 +204,101 @@ class Stream(): elif status == SchedStatus.TERMINATED: raise StreamError(terminated=True) - # checkout() + # fetch() # - # Checkout the pipeline target artifact to the specified directory + # Fetches sources on the pipeline. # # Args: - # directory (str): The directory to checkout the artifact to - # force (bool): Force overwrite files which exist in `directory` - # integrate (bool): Whether to run integration commands - # hardlinks (bool): Whether checking out files hardlinked to - # their artifacts is acceptable + # targets (list of str): Targets to fetch + # selection (PipelineSelection): The selection mode for the specified targets + # except_targets (list of str): Specified targets to except from fetching + # track_targets (bool): Whether to track selected targets in addition to fetching + # track_cross_junctions (bool): Whether tracking should cross junction boundaries # - def checkout(self, directory, force, integrate, hardlinks): - # We only have one target in a checkout command - target = self._pipeline.targets[0] + def fetch(self, targets, *, + selection=PipelineSelection.PLAN, + except_targets=None, + track_targets=False, + track_cross_junctions=False): - try: - os.makedirs(directory, exist_ok=True) - except OSError as e: - raise StreamError("Failed to create checkout directory: {}".format(e)) from e + rewritable = False + if track_targets: + rewritable = True - if not os.access(directory, os.W_OK): - raise StreamError("Directory {} not writable".format(directory)) + self.init_pipeline(targets, + except_=except_targets, + rewritable=rewritable, + track_elements=targets if track_targets else None, + track_cross_junctions=track_cross_junctions) - if not force and os.listdir(directory): - raise StreamError("Checkout directory is not empty: {}" - .format(directory)) + fetch_plan = self._pipeline.get_selection(selection) - # Stage deps into a temporary sandbox first - try: - with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox: + # Delegated to a shared method for now + self._do_fetch(fetch_plan) - # Copy or move the sandbox to the target directory - sandbox_root = sandbox.get_directory() - with target.timed_activity("Checking out files in {}".format(directory)): - try: - if hardlinks: - self._checkout_hardlinks(sandbox_root, directory) - else: - utils.copy_files(sandbox_root, directory) - except OSError as e: - raise StreamError("Failed to checkout files: {}".format(e)) from e - except BstError as e: - raise StreamError("Error while staging dependencies into a sandbox: {}".format(e), - reason=e.reason) from e + # track() + # + # Tracks all the sources of the selected elements. + # + # Args: + # targets (list of str): Targets to track + # selection (PipelineSelection): The selection mode for the specified targets + # except_targets (list of str): Specified targets to except from tracking + # cross_junctions (bool): Whether tracking should cross junction boundaries + # + # If no error is encountered while tracking, then the project files + # are rewritten inline. + # + def track(self, targets, *, + selection=PipelineSelection.NONE, + except_targets=None, + track_targets=False, + cross_junctions=False): + + self.init_pipeline(targets, + except_=except_targets, + rewritable=True, + track_elements=targets, + track_cross_junctions=cross_junctions, + track_selection=selection) + + track = TrackQueue(self._scheduler) + track.enqueue(self._pipeline._track_elements) + self.session_elements = len(self._pipeline._track_elements) + + _, status = self._scheduler.run([track]) + if status == SchedStatus.ERROR: + raise StreamError() + elif status == SchedStatus.TERMINATED: + raise StreamError(terminated=True) # pull() # - # Pulls elements from the pipeline + # Pulls artifacts from remote artifact server(s) # # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # elements (list): List of elements to pull + # targets (list of str): Targets to pull + # selection (PipelineSelection): The selection mode for the specified targets + # remote (str): The URL of a specific remote server to pull from, or None + # + # If `remote` specified as None, then regular configuration will be used + # to determine where to pull artifacts from. # - def pull(self, scheduler, elements): + def pull(self, targets, *, + selection=PipelineSelection.NONE, + remote=None): + + use_configured_remote_caches = True + if remote is not None: + use_configured_remote_caches = False + + self.init_pipeline(targets, + use_configured_remote_caches=use_configured_remote_caches, + add_remote_cache=remote) + elements = self._pipeline.get_selection(selection) if not self._pipeline._artifacts.has_fetch_remotes(): - raise StreamError("Not artifact caches available for pulling artifacts") + raise StreamError("No artifact caches available for pulling artifacts") plan = elements self._pipeline._assert_consistent(plan) @@ -245,13 +316,28 @@ class Stream(): # push() # - # Pushes elements in the pipeline + # Pulls artifacts to remote artifact server(s) # # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # elements (list): List of elements to push + # targets (list of str): Targets to push + # selection (PipelineSelection): The selection mode for the specified targets + # remote (str): The URL of a specific remote server to push to, or None + # + # If `remote` specified as None, then regular configuration will be used + # to determine where to push artifacts to. # - def push(self, scheduler, elements): + def push(self, targets, *, + selection=PipelineSelection.NONE, + remote=None): + + use_configured_remote_caches = True + if remote is not None: + use_configured_remote_caches = False + + self.init_pipeline(targets, + use_configured_remote_caches=use_configured_remote_caches, + add_remote_cache=remote) + elements = self._pipeline.get_selection(selection) if not self._pipeline._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") @@ -270,19 +356,81 @@ class Stream(): elif status == SchedStatus.TERMINATED: raise StreamError(terminated=True) + # checkout() + # + # Checkout the pipeline target artifact to the specified directory + # + # Args: + # target (str): Target to checkout + # directory (str): The directory to checkout the artifact to + # force (bool): Force overwrite files which exist in `directory` + # integrate (bool): Whether to run integration commands + # hardlinks (bool): Whether checking out files hardlinked to + # their artifacts is acceptable + # + def checkout(self, target, *, + directory=None, + force=False, + integrate=True, + hardlinks=False): + + self.init_pipeline((target,)) + + # We only have one target in a checkout command + target = self._pipeline.targets[0] + + try: + os.makedirs(directory, exist_ok=True) + except OSError as e: + raise StreamError("Failed to create checkout directory: {}".format(e)) from e + + if not os.access(directory, os.W_OK): + raise StreamError("Directory {} not writable".format(directory)) + + if not force and os.listdir(directory): + raise StreamError("Checkout directory is not empty: {}" + .format(directory)) + + # Stage deps into a temporary sandbox first + try: + with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox: + + # Copy or move the sandbox to the target directory + sandbox_root = sandbox.get_directory() + with target.timed_activity("Checking out files in {}".format(directory)): + try: + if hardlinks: + self._checkout_hardlinks(sandbox_root, directory) + else: + utils.copy_files(sandbox_root, directory) + except OSError as e: + raise StreamError("Failed to checkout files: {}".format(e)) from e + except BstError as e: + raise StreamError("Error while staging dependencies into a sandbox: {}".format(e), + reason=e.reason) from e + # workspace_open # # Open a project workspace # # Args: - # target (Element): The element to open the workspace for + # target (str): The target element to open the workspace for # directory (str): The directory to stage the source in # no_checkout (bool): Whether to skip checking out the source # track_first (bool): Whether to track and fetch first # force (bool): Whether to ignore contents in an existing directory # - def workspace_open(self, target, directory, no_checkout, track_first, force): - project = self._context.get_toplevel_project() + def workspace_open(self, target, directory, *, + no_checkout, + track_first, + force): + + self.init_pipeline((target,), + track_elements=[target] if track_first else None, + track_selection=PipelineSelection.NONE, + rewritable=track_first) + + target = self._pipeline.targets[0] workdir = os.path.abspath(directory) if not list(target.sources()): @@ -294,15 +442,19 @@ class Stream(): raise StreamError("The given element has no sources", detail=detail) # Check for workspace config - workspace = project.workspaces.get_workspace(target.name) + workspace = self._project.workspaces.get_workspace(target.name) if workspace: raise StreamError("Workspace '{}' is already defined at: {}" .format(target.name, workspace.path)) # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. + # + # For now, tracking is handled by _do_fetch() automatically + # by virtue of our screwed up pipeline initialization stuff. + # if not no_checkout or track_first: - self.fetch(self._scheduler, [target]) + self._do_fetch([target]) if not no_checkout and target._get_consistency() != Consistency.CACHED: raise StreamError("Could not stage uncached source. " + @@ -315,13 +467,13 @@ class Stream(): except OSError as e: raise StreamError("Failed to create workspace directory: {}".format(e)) from e - project.workspaces.create_workspace(target.name, workdir) + self._project.workspaces.create_workspace(target.name, workdir) if not no_checkout: with target.timed_activity("Staging sources to {}".format(directory)): target._open_workspace() - project.workspaces.save_config() + self._project.workspaces.save_config() self._message(MessageType.INFO, "Saved workspace configuration") # workspace_close @@ -332,9 +484,8 @@ class Stream(): # element_name (str): The element name to close the workspace for # remove_dir (bool): Whether to remove the associated directory # - def workspace_close(self, element_name, remove_dir): - project = self._context.get_toplevel_project() - workspace = project.workspaces.get_workspace(element_name) + def workspace_close(self, element_name, *, remove_dir): + workspace = self._project.workspaces.get_workspace(element_name) # Remove workspace directory if prompted if remove_dir: @@ -347,8 +498,8 @@ class Stream(): .format(workspace.path, e)) from e # Delete the workspace and save the configuration - project.workspaces.delete_workspace(element_name) - project.workspaces.save_config() + self._project.workspaces.delete_workspace(element_name) + self._project.workspaces.save_config() self._message(MessageType.INFO, "Closed workspace for {}".format(element_name)) # workspace_reset @@ -357,19 +508,40 @@ class Stream(): # changes. # # Args: - # target (Element): The element to reset the workspace for - # track (bool): Whether to also track the source + # targets (list of str): The target elements to reset the workspace for + # track_first (bool): Whether to also track the sources first # - def workspace_reset(self, target, track): - project = self._context.get_toplevel_project() - workspace = project.workspaces.get_workspace(target.name) + def workspace_reset(self, targets, *, track_first): - if workspace is None: - raise StreamError("Workspace '{}' is currently not defined" - .format(target.name)) + self.init_pipeline(targets, + track_elements=targets if track_first else None, + track_selection=PipelineSelection.NONE, + rewritable=track_first) - self.workspace_close(target.name, True) - self.workspace_open(target, workspace.path, False, track, False) + # Do the tracking first + if track_first: + self._do_fetch(self._pipeline.targets) + + for target in self._pipeline.targets: + workspace = self._project.workspaces.get_workspace(target.name) + + with target.timed_activity("Removing workspace directory {}" + .format(workspace.path)): + try: + shutil.rmtree(workspace.path) + except OSError as e: + raise StreamError("Could not remove '{}': {}" + .format(workspace.path, e)) from e + + self._project.workspaces.delete_workspace(target.name) + self._project.workspaces.create_workspace(target.name, workspace.path) + + with target.timed_activity("Staging sources to {}".format(workspace.path)): + target._open_workspace() + + self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path)) + + self._project.workspaces.save_config() # workspace_exists # @@ -385,13 +557,11 @@ class Stream(): # True if there are any existing workspaces. # def workspace_exists(self, element_name=None): - project = self._context.get_toplevel_project() - if element_name: - workspace = project.workspaces.get_workspace(element_name) + workspace = self._project.workspaces.get_workspace(element_name) if workspace: return True - elif any(project.workspaces.list()): + elif any(self._project.workspaces.list()): return True return False @@ -401,9 +571,8 @@ class Stream(): # Serializes the workspaces and dumps them in YAML to stdout. # def workspace_list(self): - project = self._context.get_toplevel_project() workspaces = [] - for element_name, workspace_ in project.workspaces.list(): + for element_name, workspace_ in self._project.workspaces.list(): workspace_detail = { 'element': element_name, 'directory': workspace_.path, @@ -416,16 +585,28 @@ class Stream(): # source_bundle() # - # Create a build bundle for the given artifact. + # Create a host buildable tarball bundle for the given target. # # Args: - # directory (str): The directory to checkout the artifact to - # - def source_bundle(self, scheduler, dependencies, force, - track_first, compression, directory): + # target (str): The target element to bundle + # directory (str): The directory to output the tarball + # track_first (bool): Track new source references before bundling + # compression (str): The compression type to use + # force (bool): Overwrite an existing tarball + # + def source_bundle(self, target, directory, *, + track_first=False, + force=False, + compression="gz"): + + self.init_pipeline((target,), + track_elements=[target] if track_first else None, + track_selection=PipelineSelection.NONE, + rewritable=track_first) # source-bundle only supports one target target = self._pipeline.targets[0] + dependencies = self._pipeline.get_selection(PipelineSelection.ALL) # Find the correct filename for the compression algorithm tar_location = os.path.join(directory, target.normal_name + ".tar") @@ -444,7 +625,7 @@ class Stream(): .format(tar_location, e)) from e plan = list(dependencies) - self.fetch(self._scheduler, plan) + self._do_fetch(plan) # We don't use the scheduler for this as it is almost entirely IO # bound. @@ -484,6 +665,48 @@ class Stream(): self._context.message( Message(None, message_type, message, **args)) + # _do_fetch() + # + # Performs the fetch job, the body of this function is here because + # it is shared between a few internals. + # + # Args: + # elements (list of Element): Elements to fetch + # + def _do_fetch(self, elements): + + fetch_plan = elements + + # Subtract the track elements from the fetch elements, they will be added separately + if self._pipeline._track_elements: + track_elements = set(self._pipeline._track_elements) + fetch_plan = [e for e in fetch_plan if e not in track_elements] + + # Assert consistency for the fetch elements + self._pipeline._assert_consistent(fetch_plan) + + # Filter out elements with cached sources, only from the fetch plan + # let the track plan resolve new refs. + cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] + fetch_plan = [elt for elt in fetch_plan if elt not in cached] + + self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan) + + fetch = FetchQueue(self._scheduler) + fetch.enqueue(fetch_plan) + if self._pipeline._track_elements: + track = TrackQueue(self._scheduler) + track.enqueue(self._pipeline._track_elements) + queues = [track, fetch] + else: + queues = [fetch] + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise StreamError() + elif status == SchedStatus.TERMINATED: + raise StreamError(terminated=True) + # Helper function for checkout() # def _checkout_hardlinks(self, sandbox_root, directory): @@ -546,3 +769,56 @@ class Stream(): with tarfile.open(tar_name, permissions) as tar: tar.add(directory, arcname=element_name) + + ############################################################# + # TEMPORARY CRAP # + ############################################################# + + # init_pipeline() + # + # Initialize the pipeline for a given activity + # + # Args: + # elements (list of elements): The elements to load recursively + # except_ (list of elements): The elements to except + # rewritable (bool): Whether we should load the YAML files for roundtripping + # use_configured_remote_caches (bool): Whether we should contact remotes + # add_remote_cache (str): The URL for an explicitly mentioned remote cache + # track_elements (list of elements): Elements which are to be tracked + # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries + # track_selection (PipelineSelection): The selection algorithm for track elements + # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the + # loading process, if they are not yet locally cached + # + # Note that the except_ argument may have a subtly different meaning depending + # on the activity performed on the Pipeline. In normal circumstances the except_ + # argument excludes elements from the `elements` list. In a build session, the + # except_ elements are excluded from the tracking plan. + # + def init_pipeline(self, elements, *, + except_=tuple(), + rewritable=False, + use_configured_remote_caches=False, + add_remote_cache=None, + track_elements=None, + track_cross_junctions=False, + track_selection=PipelineSelection.ALL): + + profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) + + self._pipeline = Pipeline(self._context, self._project, self._artifacts, + elements, except_, rewritable=rewritable) + + self._pipeline.initialize(use_configured_remote_caches=use_configured_remote_caches, + add_remote_cache=add_remote_cache, + track_elements=track_elements, + track_cross_junctions=track_cross_junctions, + track_selection=track_selection) + + profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) + + # Get the total + self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL))) + + if self._loaded_cb is not None: + self._loaded_cb(self._pipeline) diff --git a/tests/plugins/pipeline.py b/tests/plugins/pipeline.py index 4c0e5c397..db683094b 100644 --- a/tests/plugins/pipeline.py +++ b/tests/plugins/pipeline.py @@ -23,7 +23,7 @@ def create_pipeline(tmpdir, basedir, target): context.set_message_handler(dummy_handler) - return Pipeline(context, project, [target], []) + return Pipeline(context, project, None, [target], []) @pytest.mark.datafiles(os.path.join(DATA_DIR, 'customsource')) |