summaryrefslogtreecommitdiff
path: root/buildstream/_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_pipeline.py')
-rw-r--r--buildstream/_pipeline.py360
1 files changed, 2 insertions, 358 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 682329e6c..52bae2e5c 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -19,28 +19,18 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
-import os
-import stat
-import shlex
-import tarfile
import itertools
from operator import itemgetter
-from tempfile import TemporaryDirectory
-from ._exceptions import PipelineError, ImplError, BstError
+from ._exceptions import PipelineError
from ._message import Message, MessageType
from ._loader import Loader
from .element import Element
-from . import Consistency
-from . import Scope
-from . import _site
-from . import utils
+from . import Scope, Consistency
from ._platform import Platform
from ._project import ProjectRefStorage
from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs
-from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
-
# PipelineSelection()
#
@@ -101,8 +91,6 @@ class Pipeline():
self.context = context # The Context
self.project = project # The toplevel project
- self.session_elements = 0 # Number of elements to process in this session
- self.total_elements = 0 # Number of total potential elements for this pipeline
self.targets = [] # List of toplevel target Element objects
#
@@ -170,8 +158,6 @@ class Pipeline():
# Preflight directly, before ever interrogating caches or anything.
self._preflight()
- self.total_elements = len(list(self.dependencies(Scope.ALL)))
-
# Initialize remote artifact caches. We allow the commandline to override
# the user config in some cases (for example `bst push --remote=...`).
has_remote_caches = False
@@ -260,247 +246,6 @@ class Pipeline():
# Commands #
#############################################################
- # track()
- #
- # Trackes all the sources of all the elements in the pipeline,
- # i.e. all of the elements which the target somehow depends on.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- #
- # If no error is encountered while tracking, then the project files
- # are rewritten inline.
- #
- def track(self, scheduler):
- track = TrackQueue(scheduler)
- track.enqueue(self._track_elements)
- self.session_elements = len(self._track_elements)
-
- _, status = scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # fetch()
- #
- # Fetches sources on the pipeline.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to fetch
- #
- def fetch(self, scheduler, dependencies):
- fetch_plan = dependencies
-
- # Subtract the track elements from the fetch elements, they will be added separately
- if self._track_elements:
- track_elements = set(self._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
-
- # Assert consistency for the fetch elements
- self._assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._track_elements) + len(fetch_plan)
-
- fetch = FetchQueue(scheduler)
- fetch.enqueue(fetch_plan)
- if self._track_elements:
- track = TrackQueue(scheduler)
- track.enqueue(self._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # build()
- #
- # Builds (assembles) elements in the pipeline.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # build_all (bool): Whether to build all elements, or only those
- # which are required to build the target.
- #
- def build(self, scheduler, *, build_all=False):
-
- if build_all:
- plan = self.dependencies(Scope.ALL)
- else:
- plan = self._plan(except_=False)
-
- # We want to start the build queue with any elements that are
- # not being tracked first
- track_elements = set(self._track_elements)
- plan = [e for e in plan if e not in track_elements]
-
- # Assert that we have a consistent pipeline now (elements in
- # track_plan will be made consistent)
- self._assert_consistent(plan)
-
- fetch = FetchQueue(scheduler, skip_cached=True)
- build = BuildQueue(scheduler)
- track = None
- pull = None
- push = None
- queues = []
- if self._track_elements:
- track = TrackQueue(scheduler)
- queues.append(track)
- if self._artifacts.has_fetch_remotes():
- pull = PullQueue(scheduler)
- queues.append(pull)
- queues.append(fetch)
- queues.append(build)
- if self._artifacts.has_push_remotes():
- push = PushQueue(scheduler)
- queues.append(push)
-
- # If we're going to track, tracking elements go into the first queue
- # which is the tracking queue, the rest of the plan goes into the next
- # queue (whatever that happens to be)
- if track:
- queues[0].enqueue(self._track_elements)
- queues[1].enqueue(plan)
- else:
- queues[0].enqueue(plan)
-
- self.session_elements = len(self._track_elements) + len(plan)
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # checkout()
- #
- # Checkout the pipeline target artifact to the specified directory
- #
- # Args:
- # directory (str): The directory to checkout the artifact to
- # force (bool): Force overwrite files which exist in `directory`
- # integrate (bool): Whether to run integration commands
- # hardlinks (bool): Whether checking out files hardlinked to
- # their artifacts is acceptable
- #
- def checkout(self, directory, force, integrate, hardlinks):
- # We only have one target in a checkout command
- target = self.targets[0]
-
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise PipelineError("Failed to create checkout directory: {}".format(e)) from e
-
- if not os.access(directory, os.W_OK):
- raise PipelineError("Directory {} not writable".format(directory))
-
- if not force and os.listdir(directory):
- raise PipelineError("Checkout directory is not empty: {}"
- .format(directory))
-
- # Stage deps into a temporary sandbox first
- try:
- with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
-
- # Copy or move the sandbox to the target directory
- sandbox_root = sandbox.get_directory()
- with target.timed_activity("Checking out files in {}".format(directory)):
- try:
- if hardlinks:
- self.checkout_hardlinks(sandbox_root, directory)
- else:
- utils.copy_files(sandbox_root, directory)
- except OSError as e:
- raise PipelineError("Failed to checkout files: {}".format(e)) from e
- except BstError as e:
- raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e),
- reason=e.reason) from e
-
- # Helper function for checkout()
- #
- def checkout_hardlinks(self, sandbox_root, directory):
- try:
- removed = utils.safe_remove(directory)
- except OSError as e:
- raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e
-
- if removed:
- # Try a simple rename of the sandbox root; if that
- # doesnt cut it, then do the regular link files code path
- try:
- os.rename(sandbox_root, directory)
- except OSError:
- os.makedirs(directory, exist_ok=True)
- utils.link_files(sandbox_root, directory)
- else:
- utils.link_files(sandbox_root, directory)
-
- # pull()
- #
- # Pulls elements from the pipeline
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to pull
- #
- def pull(self, scheduler, elements):
-
- if not self._artifacts.has_fetch_remotes():
- raise PipelineError("Not artifact caches available for pulling artifacts")
-
- plan = elements
- self._assert_consistent(plan)
- self.session_elements = len(plan)
-
- pull = PullQueue(scheduler)
- pull.enqueue(plan)
- queues = [pull]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # push()
- #
- # Pushes elements in the pipeline
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to push
- #
- def push(self, scheduler, elements):
-
- if not self._artifacts.has_push_remotes():
- raise PipelineError("No artifact caches available for pushing artifacts")
-
- plan = elements
- self._assert_consistent(plan)
- self.session_elements = len(plan)
-
- push = PushQueue(scheduler)
- push.enqueue(plan)
- queues = [push]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
# remove_elements():
#
# Internal function
@@ -560,63 +305,6 @@ class Pipeline():
# in before.
return [element for element in elements if element in visited]
- # source_bundle()
- #
- # Create a build bundle for the given artifact.
- #
- # Args:
- # directory (str): The directory to checkout the artifact to
- #
- def source_bundle(self, scheduler, dependencies, force,
- track_first, compression, directory):
-
- # source-bundle only supports one target
- target = self.targets[0]
-
- # Find the correct filename for the compression algorithm
- tar_location = os.path.join(directory, target.normal_name + ".tar")
- if compression != "none":
- tar_location += "." + compression
-
- # Attempt writing a file to generate a good error message
- # early
- #
- # FIXME: A bit hackish
- try:
- open(tar_location, mode="x")
- os.remove(tar_location)
- except IOError as e:
- raise PipelineError("Cannot write to {0}: {1}"
- .format(tar_location, e)) from e
-
- plan = list(dependencies)
- self.fetch(scheduler, plan)
-
- # We don't use the scheduler for this as it is almost entirely IO
- # bound.
-
- # Create a temporary directory to build the source tree in
- builddir = target._get_context().builddir
- prefix = "{}-".format(target.normal_name)
-
- with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
- source_directory = os.path.join(tempdir, 'source')
- try:
- os.makedirs(source_directory)
- except OSError as e:
- raise PipelineError("Failed to create directory: {}"
- .format(e)) from e
-
- # Any elements that don't implement _write_script
- # should not be included in the later stages.
- plan = [element for element in plan
- if self._write_element_script(source_directory, element)]
-
- self._write_element_sources(tempdir, plan)
- self._write_build_script(tempdir, plan)
- self._collect_sources(tempdir, tar_location,
- target.normal_name, compression)
-
#############################################################
# Private Methods #
#############################################################
@@ -794,50 +482,6 @@ class Pipeline():
self.context.message(
Message(None, message_type, message, **args))
- # Write the element build script to the given directory
- def _write_element_script(self, directory, element):
- try:
- element._write_script(directory)
- except ImplError:
- return False
- return True
-
- # Write all source elements to the given directory
- def _write_element_sources(self, directory, elements):
- for element in elements:
- source_dir = os.path.join(directory, "source")
- element_source_dir = os.path.join(source_dir, element.normal_name)
-
- element._stage_sources_at(element_source_dir)
-
- # Write a master build script to the sandbox
- def _write_build_script(self, directory, elements):
-
- module_string = ""
- for element in elements:
- module_string += shlex.quote(element.normal_name) + " "
-
- script_path = os.path.join(directory, "build.sh")
-
- with open(_site.build_all_template, "r") as f:
- script_template = f.read()
-
- with utils.save_file_atomic(script_path, "w") as script:
- script.write(script_template.format(modules=module_string))
-
- os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
-
- # Collect the sources in the given sandbox into a tarfile
- def _collect_sources(self, directory, tar_name, element_name, compression):
- with self.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
- if compression == "none":
- permissions = "w:"
- else:
- permissions = "w:" + compression
-
- with tarfile.open(tar_name, permissions) as tar:
- tar.add(directory, arcname=element_name)
-
# _Planner()
#