diff options
-rw-r--r-- | buildstream/_frontend/app.py | 13 | ||||
-rw-r--r-- | buildstream/_frontend/cli.py | 16 | ||||
-rw-r--r-- | buildstream/_frontend/status.py | 14 | ||||
-rw-r--r-- | buildstream/_frontend/widget.py | 8 | ||||
-rw-r--r-- | buildstream/_pipeline.py | 360 | ||||
-rw-r--r-- | buildstream/_stream.py | 396 |
6 files changed, 427 insertions, 380 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 3bcb0d962..84c33a385 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -39,6 +39,7 @@ from .._context import Context from .._project import Project from .._exceptions import BstError, PipelineError, LoadError, LoadErrorReason, AppError from .._message import Message, MessageType, unconditional_messages +from .._stream import Stream from .._pipeline import Pipeline, PipelineSelection from .._scheduler import Scheduler from .._profile import Topics, profile_start, profile_end @@ -69,6 +70,7 @@ class App(): # Public members # self.context = None # The Context object + self.stream = None # The Stream object self.project = None # The toplevel Project object self.scheduler = None # The Scheduler self.pipeline = None # The Pipeline @@ -255,11 +257,12 @@ class App(): track_cross_junctions=False, track_selection=PipelineSelection.ALL, fetch_subprojects=False): - profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) # Start with the early stage init, this enables logging right away with self.partially_initialized(fetch_subprojects=fetch_subprojects): + profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) + # Mark the beginning of the session if session_name: self._message(MessageType.START, session_name) @@ -280,7 +283,7 @@ class App(): # Create our status printer, only available in interactive self._status = Status(self._content_profile, self._format_profile, self._success_profile, self._error_profile, - self.pipeline, self.scheduler, + self.stream, self.pipeline, self.scheduler, colors=self.colors) # Initialize pipeline @@ -293,6 +296,8 @@ class App(): except BstError as e: self._error_exit(e, "Error initializing pipeline") + self.stream = Stream(self.context, self.scheduler, self.pipeline) + # Pipeline is loaded, now we can tell the logger about it self.logger.size_request(self.pipeline) @@ -476,7 +481,7 @@ class App(): # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. if not no_checkout or track_first: - self.pipeline.fetch(self.scheduler, [target]) + self.stream.fetch(self.scheduler, [target]) if not no_checkout and target._get_consistency() != Consistency.CACHED: raise PipelineError("Could not stage uncached source. " + @@ -751,7 +756,7 @@ class App(): # def _print_summary(self): click.echo("", err=True) - self.logger.print_summary(self.pipeline, self.scheduler, + self.logger.print_summary(self.stream, self.scheduler, self._main_options['log_file'], styling=self.colors) diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index 5f9e2751e..11a0ca2cc 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -243,7 +243,7 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac use_configured_remote_caches=True, track_elements=track_, track_cross_junctions=track_cross_junctions, fetch_subprojects=True): - app.pipeline.build(app.scheduler, build_all=all_) + app.stream.build(app.scheduler, build_all=all_) ################################################################## @@ -287,7 +287,7 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions): track_cross_junctions=track_cross_junctions, fetch_subprojects=True): dependencies = app.pipeline.get_selection(deps) - app.pipeline.fetch(app.scheduler, dependencies) + app.stream.fetch(app.scheduler, dependencies) ################################################################## @@ -323,7 +323,7 @@ def track(app, elements, deps, except_, cross_junctions): track_cross_junctions=cross_junctions, track_selection=deps, fetch_subprojects=True): - app.pipeline.track(app.scheduler) + app.stream.track(app.scheduler) ################################################################## @@ -354,7 +354,7 @@ def pull(app, elements, deps, remote): with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None), add_remote_cache=remote, fetch_subprojects=True): to_pull = app.pipeline.get_selection(deps) - app.pipeline.pull(app.scheduler, to_pull) + app.stream.pull(app.scheduler, to_pull) ################################################################## @@ -385,7 +385,7 @@ def push(app, elements, deps, remote): use_configured_remote_caches=(remote is None), add_remote_cache=remote, fetch_subprojects=True): to_push = app.pipeline.get_selection(deps) - app.pipeline.push(app.scheduler, to_push) + app.stream.push(app.scheduler, to_push) ################################################################## @@ -564,7 +564,7 @@ def checkout(app, element, directory, force, integrate, hardlinks): """Checkout a built artifact to the specified directory """ with app.initialized((element,)): - app.pipeline.checkout(directory, force, integrate, hardlinks) + app.stream.checkout(directory, force, integrate, hardlinks) ################################################################## @@ -592,8 +592,8 @@ def source_bundle(app, target, force, directory, """ with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None): dependencies = app.pipeline.get_selection('all') - app.pipeline.source_bundle(app.scheduler, dependencies, force, track_, - compression, directory) + app.stream.source_bundle(app.scheduler, dependencies, force, track_, + compression, directory) ################################################################## diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py index 9bb2b644f..4f3eed0f5 100644 --- a/buildstream/_frontend/status.py +++ b/buildstream/_frontend/status.py @@ -37,6 +37,7 @@ from .widget import TimeCode # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text +# stream (Stream): The Stream # pipeline (Pipeline): The Pipeline # scheduler (Scheduler): The Scheduler # colors (bool): Whether to print the ANSI color codes in the output @@ -45,13 +46,12 @@ class Status(): def __init__(self, content_profile, format_profile, success_profile, error_profile, - pipeline, scheduler, colors=False): + stream, pipeline, scheduler, colors=False): self._content_profile = content_profile self._format_profile = format_profile self._success_profile = success_profile self._error_profile = error_profile - self._pipeline = pipeline self._scheduler = scheduler self._jobs = [] self._last_lines = 0 # Number of status lines we last printed to console @@ -60,7 +60,7 @@ class Status(): self._colors = colors self._header = _StatusHeader(content_profile, format_profile, success_profile, error_profile, - pipeline, scheduler) + stream, pipeline, scheduler) self._term_width, _ = click.get_terminal_size() self._alloc_lines = 0 @@ -246,6 +246,7 @@ class Status(): # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text +# stream (Stream): The Stream # pipeline (Pipeline): The Pipeline # scheduler (Scheduler): The Scheduler # @@ -253,7 +254,7 @@ class _StatusHeader(): def __init__(self, content_profile, format_profile, success_profile, error_profile, - pipeline, scheduler): + stream, pipeline, scheduler): # # Public members @@ -267,6 +268,7 @@ class _StatusHeader(): self._format_profile = format_profile self._success_profile = success_profile self._error_profile = error_profile + self._stream = stream self._pipeline = pipeline self._scheduler = scheduler self._time_code = TimeCode(content_profile, format_profile) @@ -276,8 +278,8 @@ class _StatusHeader(): size = 0 text = '' - session = str(self._pipeline.session_elements) - total = str(self._pipeline.total_elements) + session = str(self._stream.session_elements) + total = str(self._stream.total_elements) # Format and calculate size for pipeline target and overall time code size += len(total) + len(session) + 4 # Size for (N/N) with a leading space diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index b5942b91e..a72293b04 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -530,12 +530,12 @@ class LogLine(Widget): # Print a summary of activities at the end of a session # # Args: - # pipeline (Pipeline): The Pipeline + # stream (Stream): The Stream # scheduler (Scheduler): The Scheduler # log_file (file): An optional file handle for additional logging # styling (bool): Whether to enable ansi escape codes in the output # - def print_summary(self, pipeline, scheduler, log_file, styling=False): + def print_summary(self, stream, scheduler, log_file, styling=False): # Early silent return if there are no queues, can happen # only in the case that the pipeline early returned due to @@ -563,8 +563,8 @@ class LogLine(Widget): text += self.content_profile.fmt("Pipeline Summary\n", bold=True) values = OrderedDict() - values['Total'] = self.content_profile.fmt(str(pipeline.total_elements)) - values['Session'] = self.content_profile.fmt(str(pipeline.session_elements)) + values['Total'] = self.content_profile.fmt(str(stream.total_elements)) + values['Session'] = self.content_profile.fmt(str(stream.session_elements)) processed_maxlen = 1 skipped_maxlen = 1 diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 682329e6c..52bae2e5c 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -19,28 +19,18 @@ # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> -import os -import stat -import shlex -import tarfile import itertools from operator import itemgetter -from tempfile import TemporaryDirectory -from ._exceptions import PipelineError, ImplError, BstError +from ._exceptions import PipelineError from ._message import Message, MessageType from ._loader import Loader from .element import Element -from . import Consistency -from . import Scope -from . import _site -from . import utils +from . import Scope, Consistency from ._platform import Platform from ._project import ProjectRefStorage from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs -from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue - # PipelineSelection() # @@ -101,8 +91,6 @@ class Pipeline(): self.context = context # The Context self.project = project # The toplevel project - self.session_elements = 0 # Number of elements to process in this session - self.total_elements = 0 # Number of total potential elements for this pipeline self.targets = [] # List of toplevel target Element objects # @@ -170,8 +158,6 @@ class Pipeline(): # Preflight directly, before ever interrogating caches or anything. self._preflight() - self.total_elements = len(list(self.dependencies(Scope.ALL))) - # Initialize remote artifact caches. We allow the commandline to override # the user config in some cases (for example `bst push --remote=...`). has_remote_caches = False @@ -260,247 +246,6 @@ class Pipeline(): # Commands # ############################################################# - # track() - # - # Trackes all the sources of all the elements in the pipeline, - # i.e. all of the elements which the target somehow depends on. - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # - # If no error is encountered while tracking, then the project files - # are rewritten inline. - # - def track(self, scheduler): - track = TrackQueue(scheduler) - track.enqueue(self._track_elements) - self.session_elements = len(self._track_elements) - - _, status = scheduler.run([track]) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # fetch() - # - # Fetches sources on the pipeline. - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # dependencies (list): List of elements to fetch - # - def fetch(self, scheduler, dependencies): - fetch_plan = dependencies - - # Subtract the track elements from the fetch elements, they will be added separately - if self._track_elements: - track_elements = set(self._track_elements) - fetch_plan = [e for e in fetch_plan if e not in track_elements] - - # Assert consistency for the fetch elements - self._assert_consistent(fetch_plan) - - # Filter out elements with cached sources, only from the fetch plan - # let the track plan resolve new refs. - cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] - fetch_plan = [elt for elt in fetch_plan if elt not in cached] - - self.session_elements = len(self._track_elements) + len(fetch_plan) - - fetch = FetchQueue(scheduler) - fetch.enqueue(fetch_plan) - if self._track_elements: - track = TrackQueue(scheduler) - track.enqueue(self._track_elements) - queues = [track, fetch] - else: - queues = [fetch] - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # build() - # - # Builds (assembles) elements in the pipeline. - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # build_all (bool): Whether to build all elements, or only those - # which are required to build the target. - # - def build(self, scheduler, *, build_all=False): - - if build_all: - plan = self.dependencies(Scope.ALL) - else: - plan = self._plan(except_=False) - - # We want to start the build queue with any elements that are - # not being tracked first - track_elements = set(self._track_elements) - plan = [e for e in plan if e not in track_elements] - - # Assert that we have a consistent pipeline now (elements in - # track_plan will be made consistent) - self._assert_consistent(plan) - - fetch = FetchQueue(scheduler, skip_cached=True) - build = BuildQueue(scheduler) - track = None - pull = None - push = None - queues = [] - if self._track_elements: - track = TrackQueue(scheduler) - queues.append(track) - if self._artifacts.has_fetch_remotes(): - pull = PullQueue(scheduler) - queues.append(pull) - queues.append(fetch) - queues.append(build) - if self._artifacts.has_push_remotes(): - push = PushQueue(scheduler) - queues.append(push) - - # If we're going to track, tracking elements go into the first queue - # which is the tracking queue, the rest of the plan goes into the next - # queue (whatever that happens to be) - if track: - queues[0].enqueue(self._track_elements) - queues[1].enqueue(plan) - else: - queues[0].enqueue(plan) - - self.session_elements = len(self._track_elements) + len(plan) - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # checkout() - # - # Checkout the pipeline target artifact to the specified directory - # - # Args: - # directory (str): The directory to checkout the artifact to - # force (bool): Force overwrite files which exist in `directory` - # integrate (bool): Whether to run integration commands - # hardlinks (bool): Whether checking out files hardlinked to - # their artifacts is acceptable - # - def checkout(self, directory, force, integrate, hardlinks): - # We only have one target in a checkout command - target = self.targets[0] - - try: - os.makedirs(directory, exist_ok=True) - except OSError as e: - raise PipelineError("Failed to create checkout directory: {}".format(e)) from e - - if not os.access(directory, os.W_OK): - raise PipelineError("Directory {} not writable".format(directory)) - - if not force and os.listdir(directory): - raise PipelineError("Checkout directory is not empty: {}" - .format(directory)) - - # Stage deps into a temporary sandbox first - try: - with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox: - - # Copy or move the sandbox to the target directory - sandbox_root = sandbox.get_directory() - with target.timed_activity("Checking out files in {}".format(directory)): - try: - if hardlinks: - self.checkout_hardlinks(sandbox_root, directory) - else: - utils.copy_files(sandbox_root, directory) - except OSError as e: - raise PipelineError("Failed to checkout files: {}".format(e)) from e - except BstError as e: - raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e), - reason=e.reason) from e - - # Helper function for checkout() - # - def checkout_hardlinks(self, sandbox_root, directory): - try: - removed = utils.safe_remove(directory) - except OSError as e: - raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e - - if removed: - # Try a simple rename of the sandbox root; if that - # doesnt cut it, then do the regular link files code path - try: - os.rename(sandbox_root, directory) - except OSError: - os.makedirs(directory, exist_ok=True) - utils.link_files(sandbox_root, directory) - else: - utils.link_files(sandbox_root, directory) - - # pull() - # - # Pulls elements from the pipeline - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # elements (list): List of elements to pull - # - def pull(self, scheduler, elements): - - if not self._artifacts.has_fetch_remotes(): - raise PipelineError("Not artifact caches available for pulling artifacts") - - plan = elements - self._assert_consistent(plan) - self.session_elements = len(plan) - - pull = PullQueue(scheduler) - pull.enqueue(plan) - queues = [pull] - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # push() - # - # Pushes elements in the pipeline - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # elements (list): List of elements to push - # - def push(self, scheduler, elements): - - if not self._artifacts.has_push_remotes(): - raise PipelineError("No artifact caches available for pushing artifacts") - - plan = elements - self._assert_consistent(plan) - self.session_elements = len(plan) - - push = PushQueue(scheduler) - push.enqueue(plan) - queues = [push] - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - # remove_elements(): # # Internal function @@ -560,63 +305,6 @@ class Pipeline(): # in before. return [element for element in elements if element in visited] - # source_bundle() - # - # Create a build bundle for the given artifact. - # - # Args: - # directory (str): The directory to checkout the artifact to - # - def source_bundle(self, scheduler, dependencies, force, - track_first, compression, directory): - - # source-bundle only supports one target - target = self.targets[0] - - # Find the correct filename for the compression algorithm - tar_location = os.path.join(directory, target.normal_name + ".tar") - if compression != "none": - tar_location += "." + compression - - # Attempt writing a file to generate a good error message - # early - # - # FIXME: A bit hackish - try: - open(tar_location, mode="x") - os.remove(tar_location) - except IOError as e: - raise PipelineError("Cannot write to {0}: {1}" - .format(tar_location, e)) from e - - plan = list(dependencies) - self.fetch(scheduler, plan) - - # We don't use the scheduler for this as it is almost entirely IO - # bound. - - # Create a temporary directory to build the source tree in - builddir = target._get_context().builddir - prefix = "{}-".format(target.normal_name) - - with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir: - source_directory = os.path.join(tempdir, 'source') - try: - os.makedirs(source_directory) - except OSError as e: - raise PipelineError("Failed to create directory: {}" - .format(e)) from e - - # Any elements that don't implement _write_script - # should not be included in the later stages. - plan = [element for element in plan - if self._write_element_script(source_directory, element)] - - self._write_element_sources(tempdir, plan) - self._write_build_script(tempdir, plan) - self._collect_sources(tempdir, tar_location, - target.normal_name, compression) - ############################################################# # Private Methods # ############################################################# @@ -794,50 +482,6 @@ class Pipeline(): self.context.message( Message(None, message_type, message, **args)) - # Write the element build script to the given directory - def _write_element_script(self, directory, element): - try: - element._write_script(directory) - except ImplError: - return False - return True - - # Write all source elements to the given directory - def _write_element_sources(self, directory, elements): - for element in elements: - source_dir = os.path.join(directory, "source") - element_source_dir = os.path.join(source_dir, element.normal_name) - - element._stage_sources_at(element_source_dir) - - # Write a master build script to the sandbox - def _write_build_script(self, directory, elements): - - module_string = "" - for element in elements: - module_string += shlex.quote(element.normal_name) + " " - - script_path = os.path.join(directory, "build.sh") - - with open(_site.build_all_template, "r") as f: - script_template = f.read() - - with utils.save_file_atomic(script_path, "w") as script: - script.write(script_template.format(modules=module_string)) - - os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD) - - # Collect the sources in the given sandbox into a tarfile - def _collect_sources(self, directory, tar_name, element_name, compression): - with self.targets[0].timed_activity("Creating tarball {}".format(tar_name)): - if compression == "none": - permissions = "w:" - else: - permissions = "w:" + compression - - with tarfile.open(tar_name, permissions) as tar: - tar.add(directory, arcname=element_name) - # _Planner() # diff --git a/buildstream/_stream.py b/buildstream/_stream.py new file mode 100644 index 000000000..d80b19f34 --- /dev/null +++ b/buildstream/_stream.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +import os +import stat +import shlex +import tarfile +from tempfile import TemporaryDirectory + +from ._exceptions import PipelineError, ImplError, BstError +from . import _site +from . import utils +from . import Scope, Consistency +from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue + + +# Stream() +# +# This is the main, toplevel calling interface in BuildStream core. +# +# Args: +# context (Context): The Context object +# +class Stream(): + + def __init__(self, context, scheduler, pipeline): + self.session_elements = 0 # Number of elements to process in this session + self.total_elements = 0 # Number of total potential elements for this pipeline + + self._context = context + self._scheduler = scheduler + self._pipeline = pipeline + + self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL))) + + # track() + # + # Trackes all the sources of all the elements in the pipeline, + # i.e. all of the elements which the target somehow depends on. + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # + # If no error is encountered while tracking, then the project files + # are rewritten inline. + # + def track(self, scheduler): + track = TrackQueue(self._scheduler) + track.enqueue(self._pipeline._track_elements) + self.session_elements = len(self._pipeline._track_elements) + + _, status = self._scheduler.run([track]) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # fetch() + # + # Fetches sources on the pipeline. + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # dependencies (list): List of elements to fetch + # + def fetch(self, scheduler, dependencies): + fetch_plan = dependencies + + # Subtract the track elements from the fetch elements, they will be added separately + if self._pipeline._track_elements: + track_elements = set(self._pipeline._track_elements) + fetch_plan = [e for e in fetch_plan if e not in track_elements] + + # Assert consistency for the fetch elements + self._pipeline._assert_consistent(fetch_plan) + + # Filter out elements with cached sources, only from the fetch plan + # let the track plan resolve new refs. + cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] + fetch_plan = [elt for elt in fetch_plan if elt not in cached] + + self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan) + + fetch = FetchQueue(self._scheduler) + fetch.enqueue(fetch_plan) + if self._pipeline._track_elements: + track = TrackQueue(self._scheduler) + track.enqueue(self._pipeline._track_elements) + queues = [track, fetch] + else: + queues = [fetch] + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # build() + # + # Builds (assembles) elements in the pipeline. + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # build_all (bool): Whether to build all elements, or only those + # which are required to build the target. + # + def build(self, scheduler, *, build_all=False): + + if build_all: + plan = self._pipeline.dependencies(Scope.ALL) + else: + plan = self._pipeline._plan(except_=False) + + # We want to start the build queue with any elements that are + # not being tracked first + track_elements = set(self._pipeline._track_elements) + plan = [e for e in plan if e not in track_elements] + + # Assert that we have a consistent pipeline now (elements in + # track_plan will be made consistent) + self._pipeline._assert_consistent(plan) + + fetch = FetchQueue(self._scheduler, skip_cached=True) + build = BuildQueue(self._scheduler) + track = None + pull = None + push = None + queues = [] + if self._pipeline._track_elements: + track = TrackQueue(self._scheduler) + queues.append(track) + if self._pipeline._artifacts.has_fetch_remotes(): + pull = PullQueue(self._scheduler) + queues.append(pull) + queues.append(fetch) + queues.append(build) + if self._pipeline._artifacts.has_push_remotes(): + push = PushQueue(self._scheduler) + queues.append(push) + + # If we're going to track, tracking elements go into the first queue + # which is the tracking queue, the rest of the plan goes into the next + # queue (whatever that happens to be) + if track: + queues[0].enqueue(self._pipeline._track_elements) + queues[1].enqueue(plan) + else: + queues[0].enqueue(plan) + + self.session_elements = len(self._pipeline._track_elements) + len(plan) + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # checkout() + # + # Checkout the pipeline target artifact to the specified directory + # + # Args: + # directory (str): The directory to checkout the artifact to + # force (bool): Force overwrite files which exist in `directory` + # integrate (bool): Whether to run integration commands + # hardlinks (bool): Whether checking out files hardlinked to + # their artifacts is acceptable + # + def checkout(self, directory, force, integrate, hardlinks): + # We only have one target in a checkout command + target = self._pipeline.targets[0] + + try: + os.makedirs(directory, exist_ok=True) + except OSError as e: + raise PipelineError("Failed to create checkout directory: {}".format(e)) from e + + if not os.access(directory, os.W_OK): + raise PipelineError("Directory {} not writable".format(directory)) + + if not force and os.listdir(directory): + raise PipelineError("Checkout directory is not empty: {}" + .format(directory)) + + # Stage deps into a temporary sandbox first + try: + with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox: + + # Copy or move the sandbox to the target directory + sandbox_root = sandbox.get_directory() + with target.timed_activity("Checking out files in {}".format(directory)): + try: + if hardlinks: + self.checkout_hardlinks(sandbox_root, directory) + else: + utils.copy_files(sandbox_root, directory) + except OSError as e: + raise PipelineError("Failed to checkout files: {}".format(e)) from e + except BstError as e: + raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e), + reason=e.reason) from e + + # Helper function for checkout() + # + def checkout_hardlinks(self, sandbox_root, directory): + try: + removed = utils.safe_remove(directory) + except OSError as e: + raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e + + if removed: + # Try a simple rename of the sandbox root; if that + # doesnt cut it, then do the regular link files code path + try: + os.rename(sandbox_root, directory) + except OSError: + os.makedirs(directory, exist_ok=True) + utils.link_files(sandbox_root, directory) + else: + utils.link_files(sandbox_root, directory) + + # pull() + # + # Pulls elements from the pipeline + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # elements (list): List of elements to pull + # + def pull(self, scheduler, elements): + + if not self._pipeline._artifacts.has_fetch_remotes(): + raise PipelineError("Not artifact caches available for pulling artifacts") + + plan = elements + self._pipeline._assert_consistent(plan) + self._pipeline.session_elements = len(plan) + + pull = PullQueue(self._scheduler) + pull.enqueue(plan) + queues = [pull] + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # push() + # + # Pushes elements in the pipeline + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # elements (list): List of elements to push + # + def push(self, scheduler, elements): + + if not self._pipeline._artifacts.has_push_remotes(): + raise PipelineError("No artifact caches available for pushing artifacts") + + plan = elements + self._pipeline._assert_consistent(plan) + self._pipeline.session_elements = len(plan) + + push = PushQueue(self._scheduler) + push.enqueue(plan) + queues = [push] + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # source_bundle() + # + # Create a build bundle for the given artifact. + # + # Args: + # directory (str): The directory to checkout the artifact to + # + def source_bundle(self, scheduler, dependencies, force, + track_first, compression, directory): + + # source-bundle only supports one target + target = self._pipeline.targets[0] + + # Find the correct filename for the compression algorithm + tar_location = os.path.join(directory, target.normal_name + ".tar") + if compression != "none": + tar_location += "." + compression + + # Attempt writing a file to generate a good error message + # early + # + # FIXME: A bit hackish + try: + open(tar_location, mode="x") + os.remove(tar_location) + except IOError as e: + raise PipelineError("Cannot write to {0}: {1}" + .format(tar_location, e)) from e + + plan = list(dependencies) + self.fetch(self._scheduler, plan) + + # We don't use the scheduler for this as it is almost entirely IO + # bound. + + # Create a temporary directory to build the source tree in + builddir = target._get_context().builddir + prefix = "{}-".format(target.normal_name) + + with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir: + source_directory = os.path.join(tempdir, 'source') + try: + os.makedirs(source_directory) + except OSError as e: + raise PipelineError("Failed to create directory: {}" + .format(e)) from e + + # Any elements that don't implement _write_script + # should not be included in the later stages. + plan = [element for element in plan + if self._write_element_script(source_directory, element)] + + self._write_element_sources(tempdir, plan) + self._write_build_script(tempdir, plan) + self._collect_sources(tempdir, tar_location, + target.normal_name, compression) + + ############################################################# + # Private Methods # + ############################################################# + + # Write the element build script to the given directory + def _write_element_script(self, directory, element): + try: + element._write_script(directory) + except ImplError: + return False + return True + + # Write all source elements to the given directory + def _write_element_sources(self, directory, elements): + for element in elements: + source_dir = os.path.join(directory, "source") + element_source_dir = os.path.join(source_dir, element.normal_name) + + element._stage_sources_at(element_source_dir) + + # Write a master build script to the sandbox + def _write_build_script(self, directory, elements): + + module_string = "" + for element in elements: + module_string += shlex.quote(element.normal_name) + " " + + script_path = os.path.join(directory, "build.sh") + + with open(_site.build_all_template, "r") as f: + script_template = f.read() + + with utils.save_file_atomic(script_path, "w") as script: + script.write(script_template.format(modules=module_string)) + + os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD) + + # Collect the sources in the given sandbox into a tarfile + def _collect_sources(self, directory, tar_name, element_name, compression): + with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)): + if compression == "none": + permissions = "w:" + else: + permissions = "w:" + compression + + with tarfile.open(tar_name, permissions) as tar: + tar.add(directory, arcname=element_name) |