diff options
Diffstat (limited to 'buildstream/_stream.py')
-rw-r--r-- | buildstream/_stream.py | 396 |
1 files changed, 396 insertions, 0 deletions
diff --git a/buildstream/_stream.py b/buildstream/_stream.py new file mode 100644 index 000000000..d80b19f34 --- /dev/null +++ b/buildstream/_stream.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +import os +import stat +import shlex +import tarfile +from tempfile import TemporaryDirectory + +from ._exceptions import PipelineError, ImplError, BstError +from . import _site +from . import utils +from . import Scope, Consistency +from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue + + +# Stream() +# +# This is the main, toplevel calling interface in BuildStream core. +# +# Args: +# context (Context): The Context object +# +class Stream(): + + def __init__(self, context, scheduler, pipeline): + self.session_elements = 0 # Number of elements to process in this session + self.total_elements = 0 # Number of total potential elements for this pipeline + + self._context = context + self._scheduler = scheduler + self._pipeline = pipeline + + self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL))) + + # track() + # + # Trackes all the sources of all the elements in the pipeline, + # i.e. all of the elements which the target somehow depends on. + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # + # If no error is encountered while tracking, then the project files + # are rewritten inline. + # + def track(self, scheduler): + track = TrackQueue(self._scheduler) + track.enqueue(self._pipeline._track_elements) + self.session_elements = len(self._pipeline._track_elements) + + _, status = self._scheduler.run([track]) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # fetch() + # + # Fetches sources on the pipeline. + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # dependencies (list): List of elements to fetch + # + def fetch(self, scheduler, dependencies): + fetch_plan = dependencies + + # Subtract the track elements from the fetch elements, they will be added separately + if self._pipeline._track_elements: + track_elements = set(self._pipeline._track_elements) + fetch_plan = [e for e in fetch_plan if e not in track_elements] + + # Assert consistency for the fetch elements + self._pipeline._assert_consistent(fetch_plan) + + # Filter out elements with cached sources, only from the fetch plan + # let the track plan resolve new refs. + cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] + fetch_plan = [elt for elt in fetch_plan if elt not in cached] + + self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan) + + fetch = FetchQueue(self._scheduler) + fetch.enqueue(fetch_plan) + if self._pipeline._track_elements: + track = TrackQueue(self._scheduler) + track.enqueue(self._pipeline._track_elements) + queues = [track, fetch] + else: + queues = [fetch] + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # build() + # + # Builds (assembles) elements in the pipeline. + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # build_all (bool): Whether to build all elements, or only those + # which are required to build the target. + # + def build(self, scheduler, *, build_all=False): + + if build_all: + plan = self._pipeline.dependencies(Scope.ALL) + else: + plan = self._pipeline._plan(except_=False) + + # We want to start the build queue with any elements that are + # not being tracked first + track_elements = set(self._pipeline._track_elements) + plan = [e for e in plan if e not in track_elements] + + # Assert that we have a consistent pipeline now (elements in + # track_plan will be made consistent) + self._pipeline._assert_consistent(plan) + + fetch = FetchQueue(self._scheduler, skip_cached=True) + build = BuildQueue(self._scheduler) + track = None + pull = None + push = None + queues = [] + if self._pipeline._track_elements: + track = TrackQueue(self._scheduler) + queues.append(track) + if self._pipeline._artifacts.has_fetch_remotes(): + pull = PullQueue(self._scheduler) + queues.append(pull) + queues.append(fetch) + queues.append(build) + if self._pipeline._artifacts.has_push_remotes(): + push = PushQueue(self._scheduler) + queues.append(push) + + # If we're going to track, tracking elements go into the first queue + # which is the tracking queue, the rest of the plan goes into the next + # queue (whatever that happens to be) + if track: + queues[0].enqueue(self._pipeline._track_elements) + queues[1].enqueue(plan) + else: + queues[0].enqueue(plan) + + self.session_elements = len(self._pipeline._track_elements) + len(plan) + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # checkout() + # + # Checkout the pipeline target artifact to the specified directory + # + # Args: + # directory (str): The directory to checkout the artifact to + # force (bool): Force overwrite files which exist in `directory` + # integrate (bool): Whether to run integration commands + # hardlinks (bool): Whether checking out files hardlinked to + # their artifacts is acceptable + # + def checkout(self, directory, force, integrate, hardlinks): + # We only have one target in a checkout command + target = self._pipeline.targets[0] + + try: + os.makedirs(directory, exist_ok=True) + except OSError as e: + raise PipelineError("Failed to create checkout directory: {}".format(e)) from e + + if not os.access(directory, os.W_OK): + raise PipelineError("Directory {} not writable".format(directory)) + + if not force and os.listdir(directory): + raise PipelineError("Checkout directory is not empty: {}" + .format(directory)) + + # Stage deps into a temporary sandbox first + try: + with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox: + + # Copy or move the sandbox to the target directory + sandbox_root = sandbox.get_directory() + with target.timed_activity("Checking out files in {}".format(directory)): + try: + if hardlinks: + self.checkout_hardlinks(sandbox_root, directory) + else: + utils.copy_files(sandbox_root, directory) + except OSError as e: + raise PipelineError("Failed to checkout files: {}".format(e)) from e + except BstError as e: + raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e), + reason=e.reason) from e + + # Helper function for checkout() + # + def checkout_hardlinks(self, sandbox_root, directory): + try: + removed = utils.safe_remove(directory) + except OSError as e: + raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e + + if removed: + # Try a simple rename of the sandbox root; if that + # doesnt cut it, then do the regular link files code path + try: + os.rename(sandbox_root, directory) + except OSError: + os.makedirs(directory, exist_ok=True) + utils.link_files(sandbox_root, directory) + else: + utils.link_files(sandbox_root, directory) + + # pull() + # + # Pulls elements from the pipeline + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # elements (list): List of elements to pull + # + def pull(self, scheduler, elements): + + if not self._pipeline._artifacts.has_fetch_remotes(): + raise PipelineError("Not artifact caches available for pulling artifacts") + + plan = elements + self._pipeline._assert_consistent(plan) + self._pipeline.session_elements = len(plan) + + pull = PullQueue(self._scheduler) + pull.enqueue(plan) + queues = [pull] + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # push() + # + # Pushes elements in the pipeline + # + # Args: + # scheduler (Scheduler): The scheduler to run this pipeline on + # elements (list): List of elements to push + # + def push(self, scheduler, elements): + + if not self._pipeline._artifacts.has_push_remotes(): + raise PipelineError("No artifact caches available for pushing artifacts") + + plan = elements + self._pipeline._assert_consistent(plan) + self._pipeline.session_elements = len(plan) + + push = PushQueue(self._scheduler) + push.enqueue(plan) + queues = [push] + + _, status = self._scheduler.run(queues) + if status == SchedStatus.ERROR: + raise PipelineError() + elif status == SchedStatus.TERMINATED: + raise PipelineError(terminated=True) + + # source_bundle() + # + # Create a build bundle for the given artifact. + # + # Args: + # directory (str): The directory to checkout the artifact to + # + def source_bundle(self, scheduler, dependencies, force, + track_first, compression, directory): + + # source-bundle only supports one target + target = self._pipeline.targets[0] + + # Find the correct filename for the compression algorithm + tar_location = os.path.join(directory, target.normal_name + ".tar") + if compression != "none": + tar_location += "." + compression + + # Attempt writing a file to generate a good error message + # early + # + # FIXME: A bit hackish + try: + open(tar_location, mode="x") + os.remove(tar_location) + except IOError as e: + raise PipelineError("Cannot write to {0}: {1}" + .format(tar_location, e)) from e + + plan = list(dependencies) + self.fetch(self._scheduler, plan) + + # We don't use the scheduler for this as it is almost entirely IO + # bound. + + # Create a temporary directory to build the source tree in + builddir = target._get_context().builddir + prefix = "{}-".format(target.normal_name) + + with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir: + source_directory = os.path.join(tempdir, 'source') + try: + os.makedirs(source_directory) + except OSError as e: + raise PipelineError("Failed to create directory: {}" + .format(e)) from e + + # Any elements that don't implement _write_script + # should not be included in the later stages. + plan = [element for element in plan + if self._write_element_script(source_directory, element)] + + self._write_element_sources(tempdir, plan) + self._write_build_script(tempdir, plan) + self._collect_sources(tempdir, tar_location, + target.normal_name, compression) + + ############################################################# + # Private Methods # + ############################################################# + + # Write the element build script to the given directory + def _write_element_script(self, directory, element): + try: + element._write_script(directory) + except ImplError: + return False + return True + + # Write all source elements to the given directory + def _write_element_sources(self, directory, elements): + for element in elements: + source_dir = os.path.join(directory, "source") + element_source_dir = os.path.join(source_dir, element.normal_name) + + element._stage_sources_at(element_source_dir) + + # Write a master build script to the sandbox + def _write_build_script(self, directory, elements): + + module_string = "" + for element in elements: + module_string += shlex.quote(element.normal_name) + " " + + script_path = os.path.join(directory, "build.sh") + + with open(_site.build_all_template, "r") as f: + script_template = f.read() + + with utils.save_file_atomic(script_path, "w") as script: + script.write(script_template.format(modules=module_string)) + + os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD) + + # Collect the sources in the given sandbox into a tarfile + def _collect_sources(self, directory, tar_name, element_name, compression): + with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)): + if compression == "none": + permissions = "w:" + else: + permissions = "w:" + compression + + with tarfile.open(tar_name, permissions) as tar: + tar.add(directory, arcname=element_name) |