diff options
Diffstat (limited to 'buildstream/_pipeline.py')
-rw-r--r-- | buildstream/_pipeline.py | 360 |
1 files changed, 2 insertions, 358 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 682329e6c..52bae2e5c 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -19,28 +19,18 @@ # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> -import os -import stat -import shlex -import tarfile import itertools from operator import itemgetter -from tempfile import TemporaryDirectory -from ._exceptions import PipelineError, ImplError, BstError +from ._exceptions import PipelineError from ._message import Message, MessageType from ._loader import Loader from .element import Element -from . import Consistency -from . import Scope -from . import _site -from . import utils +from . import Scope, Consistency from ._platform import Platform from ._project import ProjectRefStorage from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs -from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue - # PipelineSelection() # @@ -101,8 +91,6 @@ class Pipeline(): self.context = context # The Context self.project = project # The toplevel project - self.session_elements = 0 # Number of elements to process in this session - self.total_elements = 0 # Number of total potential elements for this pipeline self.targets = [] # List of toplevel target Element objects # @@ -170,8 +158,6 @@ class Pipeline(): # Preflight directly, before ever interrogating caches or anything. self._preflight() - self.total_elements = len(list(self.dependencies(Scope.ALL))) - # Initialize remote artifact caches. We allow the commandline to override # the user config in some cases (for example `bst push --remote=...`). has_remote_caches = False @@ -260,247 +246,6 @@ class Pipeline(): # Commands # ############################################################# - # track() - # - # Trackes all the sources of all the elements in the pipeline, - # i.e. all of the elements which the target somehow depends on. - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # - # If no error is encountered while tracking, then the project files - # are rewritten inline. - # - def track(self, scheduler): - track = TrackQueue(scheduler) - track.enqueue(self._track_elements) - self.session_elements = len(self._track_elements) - - _, status = scheduler.run([track]) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # fetch() - # - # Fetches sources on the pipeline. - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # dependencies (list): List of elements to fetch - # - def fetch(self, scheduler, dependencies): - fetch_plan = dependencies - - # Subtract the track elements from the fetch elements, they will be added separately - if self._track_elements: - track_elements = set(self._track_elements) - fetch_plan = [e for e in fetch_plan if e not in track_elements] - - # Assert consistency for the fetch elements - self._assert_consistent(fetch_plan) - - # Filter out elements with cached sources, only from the fetch plan - # let the track plan resolve new refs. - cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] - fetch_plan = [elt for elt in fetch_plan if elt not in cached] - - self.session_elements = len(self._track_elements) + len(fetch_plan) - - fetch = FetchQueue(scheduler) - fetch.enqueue(fetch_plan) - if self._track_elements: - track = TrackQueue(scheduler) - track.enqueue(self._track_elements) - queues = [track, fetch] - else: - queues = [fetch] - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # build() - # - # Builds (assembles) elements in the pipeline. - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # build_all (bool): Whether to build all elements, or only those - # which are required to build the target. - # - def build(self, scheduler, *, build_all=False): - - if build_all: - plan = self.dependencies(Scope.ALL) - else: - plan = self._plan(except_=False) - - # We want to start the build queue with any elements that are - # not being tracked first - track_elements = set(self._track_elements) - plan = [e for e in plan if e not in track_elements] - - # Assert that we have a consistent pipeline now (elements in - # track_plan will be made consistent) - self._assert_consistent(plan) - - fetch = FetchQueue(scheduler, skip_cached=True) - build = BuildQueue(scheduler) - track = None - pull = None - push = None - queues = [] - if self._track_elements: - track = TrackQueue(scheduler) - queues.append(track) - if self._artifacts.has_fetch_remotes(): - pull = PullQueue(scheduler) - queues.append(pull) - queues.append(fetch) - queues.append(build) - if self._artifacts.has_push_remotes(): - push = PushQueue(scheduler) - queues.append(push) - - # If we're going to track, tracking elements go into the first queue - # which is the tracking queue, the rest of the plan goes into the next - # queue (whatever that happens to be) - if track: - queues[0].enqueue(self._track_elements) - queues[1].enqueue(plan) - else: - queues[0].enqueue(plan) - - self.session_elements = len(self._track_elements) + len(plan) - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # checkout() - # - # Checkout the pipeline target artifact to the specified directory - # - # Args: - # directory (str): The directory to checkout the artifact to - # force (bool): Force overwrite files which exist in `directory` - # integrate (bool): Whether to run integration commands - # hardlinks (bool): Whether checking out files hardlinked to - # their artifacts is acceptable - # - def checkout(self, directory, force, integrate, hardlinks): - # We only have one target in a checkout command - target = self.targets[0] - - try: - os.makedirs(directory, exist_ok=True) - except OSError as e: - raise PipelineError("Failed to create checkout directory: {}".format(e)) from e - - if not os.access(directory, os.W_OK): - raise PipelineError("Directory {} not writable".format(directory)) - - if not force and os.listdir(directory): - raise PipelineError("Checkout directory is not empty: {}" - .format(directory)) - - # Stage deps into a temporary sandbox first - try: - with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox: - - # Copy or move the sandbox to the target directory - sandbox_root = sandbox.get_directory() - with target.timed_activity("Checking out files in {}".format(directory)): - try: - if hardlinks: - self.checkout_hardlinks(sandbox_root, directory) - else: - utils.copy_files(sandbox_root, directory) - except OSError as e: - raise PipelineError("Failed to checkout files: {}".format(e)) from e - except BstError as e: - raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e), - reason=e.reason) from e - - # Helper function for checkout() - # - def checkout_hardlinks(self, sandbox_root, directory): - try: - removed = utils.safe_remove(directory) - except OSError as e: - raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e - - if removed: - # Try a simple rename of the sandbox root; if that - # doesnt cut it, then do the regular link files code path - try: - os.rename(sandbox_root, directory) - except OSError: - os.makedirs(directory, exist_ok=True) - utils.link_files(sandbox_root, directory) - else: - utils.link_files(sandbox_root, directory) - - # pull() - # - # Pulls elements from the pipeline - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # elements (list): List of elements to pull - # - def pull(self, scheduler, elements): - - if not self._artifacts.has_fetch_remotes(): - raise PipelineError("Not artifact caches available for pulling artifacts") - - plan = elements - self._assert_consistent(plan) - self.session_elements = len(plan) - - pull = PullQueue(scheduler) - pull.enqueue(plan) - queues = [pull] - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - - # push() - # - # Pushes elements in the pipeline - # - # Args: - # scheduler (Scheduler): The scheduler to run this pipeline on - # elements (list): List of elements to push - # - def push(self, scheduler, elements): - - if not self._artifacts.has_push_remotes(): - raise PipelineError("No artifact caches available for pushing artifacts") - - plan = elements - self._assert_consistent(plan) - self.session_elements = len(plan) - - push = PushQueue(scheduler) - push.enqueue(plan) - queues = [push] - - _, status = scheduler.run(queues) - if status == SchedStatus.ERROR: - raise PipelineError() - elif status == SchedStatus.TERMINATED: - raise PipelineError(terminated=True) - # remove_elements(): # # Internal function @@ -560,63 +305,6 @@ class Pipeline(): # in before. return [element for element in elements if element in visited] - # source_bundle() - # - # Create a build bundle for the given artifact. - # - # Args: - # directory (str): The directory to checkout the artifact to - # - def source_bundle(self, scheduler, dependencies, force, - track_first, compression, directory): - - # source-bundle only supports one target - target = self.targets[0] - - # Find the correct filename for the compression algorithm - tar_location = os.path.join(directory, target.normal_name + ".tar") - if compression != "none": - tar_location += "." + compression - - # Attempt writing a file to generate a good error message - # early - # - # FIXME: A bit hackish - try: - open(tar_location, mode="x") - os.remove(tar_location) - except IOError as e: - raise PipelineError("Cannot write to {0}: {1}" - .format(tar_location, e)) from e - - plan = list(dependencies) - self.fetch(scheduler, plan) - - # We don't use the scheduler for this as it is almost entirely IO - # bound. - - # Create a temporary directory to build the source tree in - builddir = target._get_context().builddir - prefix = "{}-".format(target.normal_name) - - with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir: - source_directory = os.path.join(tempdir, 'source') - try: - os.makedirs(source_directory) - except OSError as e: - raise PipelineError("Failed to create directory: {}" - .format(e)) from e - - # Any elements that don't implement _write_script - # should not be included in the later stages. - plan = [element for element in plan - if self._write_element_script(source_directory, element)] - - self._write_element_sources(tempdir, plan) - self._write_build_script(tempdir, plan) - self._collect_sources(tempdir, tar_location, - target.normal_name, compression) - ############################################################# # Private Methods # ############################################################# @@ -794,50 +482,6 @@ class Pipeline(): self.context.message( Message(None, message_type, message, **args)) - # Write the element build script to the given directory - def _write_element_script(self, directory, element): - try: - element._write_script(directory) - except ImplError: - return False - return True - - # Write all source elements to the given directory - def _write_element_sources(self, directory, elements): - for element in elements: - source_dir = os.path.join(directory, "source") - element_source_dir = os.path.join(source_dir, element.normal_name) - - element._stage_sources_at(element_source_dir) - - # Write a master build script to the sandbox - def _write_build_script(self, directory, elements): - - module_string = "" - for element in elements: - module_string += shlex.quote(element.normal_name) + " " - - script_path = os.path.join(directory, "build.sh") - - with open(_site.build_all_template, "r") as f: - script_template = f.read() - - with utils.save_file_atomic(script_path, "w") as script: - script.write(script_template.format(modules=module_string)) - - os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD) - - # Collect the sources in the given sandbox into a tarfile - def _collect_sources(self, directory, tar_name, element_name, compression): - with self.targets[0].timed_activity("Creating tarball {}".format(tar_name)): - if compression == "none": - permissions = "w:" - else: - permissions = "w:" + compression - - with tarfile.open(tar_name, permissions) as tar: - tar.add(directory, arcname=element_name) - # _Planner() # |