summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildstream/_frontend/app.py13
-rw-r--r--buildstream/_frontend/cli.py16
-rw-r--r--buildstream/_frontend/status.py14
-rw-r--r--buildstream/_frontend/widget.py8
-rw-r--r--buildstream/_pipeline.py360
-rw-r--r--buildstream/_stream.py396
6 files changed, 427 insertions, 380 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 3bcb0d962..84c33a385 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,6 +39,7 @@ from .._context import Context
from .._project import Project
from .._exceptions import BstError, PipelineError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
+from .._stream import Stream
from .._pipeline import Pipeline, PipelineSelection
from .._scheduler import Scheduler
from .._profile import Topics, profile_start, profile_end
@@ -69,6 +70,7 @@ class App():
# Public members
#
self.context = None # The Context object
+ self.stream = None # The Stream object
self.project = None # The toplevel Project object
self.scheduler = None # The Scheduler
self.pipeline = None # The Pipeline
@@ -255,11 +257,12 @@ class App():
track_cross_junctions=False,
track_selection=PipelineSelection.ALL,
fetch_subprojects=False):
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
# Start with the early stage init, this enables logging right away
with self.partially_initialized(fetch_subprojects=fetch_subprojects):
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
# Mark the beginning of the session
if session_name:
self._message(MessageType.START, session_name)
@@ -280,7 +283,7 @@ class App():
# Create our status printer, only available in interactive
self._status = Status(self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
- self.pipeline, self.scheduler,
+ self.stream, self.pipeline, self.scheduler,
colors=self.colors)
# Initialize pipeline
@@ -293,6 +296,8 @@ class App():
except BstError as e:
self._error_exit(e, "Error initializing pipeline")
+ self.stream = Stream(self.context, self.scheduler, self.pipeline)
+
# Pipeline is loaded, now we can tell the logger about it
self.logger.size_request(self.pipeline)
@@ -476,7 +481,7 @@ class App():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
if not no_checkout or track_first:
- self.pipeline.fetch(self.scheduler, [target])
+ self.stream.fetch(self.scheduler, [target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise PipelineError("Could not stage uncached source. " +
@@ -751,7 +756,7 @@ class App():
#
def _print_summary(self):
click.echo("", err=True)
- self.logger.print_summary(self.pipeline, self.scheduler,
+ self.logger.print_summary(self.stream, self.scheduler,
self._main_options['log_file'],
styling=self.colors)
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 5f9e2751e..11a0ca2cc 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -243,7 +243,7 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
use_configured_remote_caches=True, track_elements=track_,
track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
- app.pipeline.build(app.scheduler, build_all=all_)
+ app.stream.build(app.scheduler, build_all=all_)
##################################################################
@@ -287,7 +287,7 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions):
track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
dependencies = app.pipeline.get_selection(deps)
- app.pipeline.fetch(app.scheduler, dependencies)
+ app.stream.fetch(app.scheduler, dependencies)
##################################################################
@@ -323,7 +323,7 @@ def track(app, elements, deps, except_, cross_junctions):
track_cross_junctions=cross_junctions,
track_selection=deps,
fetch_subprojects=True):
- app.pipeline.track(app.scheduler)
+ app.stream.track(app.scheduler)
##################################################################
@@ -354,7 +354,7 @@ def pull(app, elements, deps, remote):
with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None),
add_remote_cache=remote, fetch_subprojects=True):
to_pull = app.pipeline.get_selection(deps)
- app.pipeline.pull(app.scheduler, to_pull)
+ app.stream.pull(app.scheduler, to_pull)
##################################################################
@@ -385,7 +385,7 @@ def push(app, elements, deps, remote):
use_configured_remote_caches=(remote is None),
add_remote_cache=remote, fetch_subprojects=True):
to_push = app.pipeline.get_selection(deps)
- app.pipeline.push(app.scheduler, to_push)
+ app.stream.push(app.scheduler, to_push)
##################################################################
@@ -564,7 +564,7 @@ def checkout(app, element, directory, force, integrate, hardlinks):
"""Checkout a built artifact to the specified directory
"""
with app.initialized((element,)):
- app.pipeline.checkout(directory, force, integrate, hardlinks)
+ app.stream.checkout(directory, force, integrate, hardlinks)
##################################################################
@@ -592,8 +592,8 @@ def source_bundle(app, target, force, directory,
"""
with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None):
dependencies = app.pipeline.get_selection('all')
- app.pipeline.source_bundle(app.scheduler, dependencies, force, track_,
- compression, directory)
+ app.stream.source_bundle(app.scheduler, dependencies, force, track_,
+ compression, directory)
##################################################################
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 9bb2b644f..4f3eed0f5 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -37,6 +37,7 @@ from .widget import TimeCode
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
+# stream (Stream): The Stream
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
# colors (bool): Whether to print the ANSI color codes in the output
@@ -45,13 +46,12 @@ class Status():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler, colors=False):
+ stream, pipeline, scheduler, colors=False):
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
- self._pipeline = pipeline
self._scheduler = scheduler
self._jobs = []
self._last_lines = 0 # Number of status lines we last printed to console
@@ -60,7 +60,7 @@ class Status():
self._colors = colors
self._header = _StatusHeader(content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler)
+ stream, pipeline, scheduler)
self._term_width, _ = click.get_terminal_size()
self._alloc_lines = 0
@@ -246,6 +246,7 @@ class Status():
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
+# stream (Stream): The Stream
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
#
@@ -253,7 +254,7 @@ class _StatusHeader():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler):
+ stream, pipeline, scheduler):
#
# Public members
@@ -267,6 +268,7 @@ class _StatusHeader():
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
+ self._stream = stream
self._pipeline = pipeline
self._scheduler = scheduler
self._time_code = TimeCode(content_profile, format_profile)
@@ -276,8 +278,8 @@ class _StatusHeader():
size = 0
text = ''
- session = str(self._pipeline.session_elements)
- total = str(self._pipeline.total_elements)
+ session = str(self._stream.session_elements)
+ total = str(self._stream.total_elements)
# Format and calculate size for pipeline target and overall time code
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index b5942b91e..a72293b04 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -530,12 +530,12 @@ class LogLine(Widget):
# Print a summary of activities at the end of a session
#
# Args:
- # pipeline (Pipeline): The Pipeline
+ # stream (Stream): The Stream
# scheduler (Scheduler): The Scheduler
# log_file (file): An optional file handle for additional logging
# styling (bool): Whether to enable ansi escape codes in the output
#
- def print_summary(self, pipeline, scheduler, log_file, styling=False):
+ def print_summary(self, stream, scheduler, log_file, styling=False):
# Early silent return if there are no queues, can happen
# only in the case that the pipeline early returned due to
@@ -563,8 +563,8 @@ class LogLine(Widget):
text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
values = OrderedDict()
- values['Total'] = self.content_profile.fmt(str(pipeline.total_elements))
- values['Session'] = self.content_profile.fmt(str(pipeline.session_elements))
+ values['Total'] = self.content_profile.fmt(str(stream.total_elements))
+ values['Session'] = self.content_profile.fmt(str(stream.session_elements))
processed_maxlen = 1
skipped_maxlen = 1
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 682329e6c..52bae2e5c 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -19,28 +19,18 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
-import os
-import stat
-import shlex
-import tarfile
import itertools
from operator import itemgetter
-from tempfile import TemporaryDirectory
-from ._exceptions import PipelineError, ImplError, BstError
+from ._exceptions import PipelineError
from ._message import Message, MessageType
from ._loader import Loader
from .element import Element
-from . import Consistency
-from . import Scope
-from . import _site
-from . import utils
+from . import Scope, Consistency
from ._platform import Platform
from ._project import ProjectRefStorage
from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs
-from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
-
# PipelineSelection()
#
@@ -101,8 +91,6 @@ class Pipeline():
self.context = context # The Context
self.project = project # The toplevel project
- self.session_elements = 0 # Number of elements to process in this session
- self.total_elements = 0 # Number of total potential elements for this pipeline
self.targets = [] # List of toplevel target Element objects
#
@@ -170,8 +158,6 @@ class Pipeline():
# Preflight directly, before ever interrogating caches or anything.
self._preflight()
- self.total_elements = len(list(self.dependencies(Scope.ALL)))
-
# Initialize remote artifact caches. We allow the commandline to override
# the user config in some cases (for example `bst push --remote=...`).
has_remote_caches = False
@@ -260,247 +246,6 @@ class Pipeline():
# Commands #
#############################################################
- # track()
- #
- # Trackes all the sources of all the elements in the pipeline,
- # i.e. all of the elements which the target somehow depends on.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- #
- # If no error is encountered while tracking, then the project files
- # are rewritten inline.
- #
- def track(self, scheduler):
- track = TrackQueue(scheduler)
- track.enqueue(self._track_elements)
- self.session_elements = len(self._track_elements)
-
- _, status = scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # fetch()
- #
- # Fetches sources on the pipeline.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to fetch
- #
- def fetch(self, scheduler, dependencies):
- fetch_plan = dependencies
-
- # Subtract the track elements from the fetch elements, they will be added separately
- if self._track_elements:
- track_elements = set(self._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
-
- # Assert consistency for the fetch elements
- self._assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._track_elements) + len(fetch_plan)
-
- fetch = FetchQueue(scheduler)
- fetch.enqueue(fetch_plan)
- if self._track_elements:
- track = TrackQueue(scheduler)
- track.enqueue(self._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # build()
- #
- # Builds (assembles) elements in the pipeline.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # build_all (bool): Whether to build all elements, or only those
- # which are required to build the target.
- #
- def build(self, scheduler, *, build_all=False):
-
- if build_all:
- plan = self.dependencies(Scope.ALL)
- else:
- plan = self._plan(except_=False)
-
- # We want to start the build queue with any elements that are
- # not being tracked first
- track_elements = set(self._track_elements)
- plan = [e for e in plan if e not in track_elements]
-
- # Assert that we have a consistent pipeline now (elements in
- # track_plan will be made consistent)
- self._assert_consistent(plan)
-
- fetch = FetchQueue(scheduler, skip_cached=True)
- build = BuildQueue(scheduler)
- track = None
- pull = None
- push = None
- queues = []
- if self._track_elements:
- track = TrackQueue(scheduler)
- queues.append(track)
- if self._artifacts.has_fetch_remotes():
- pull = PullQueue(scheduler)
- queues.append(pull)
- queues.append(fetch)
- queues.append(build)
- if self._artifacts.has_push_remotes():
- push = PushQueue(scheduler)
- queues.append(push)
-
- # If we're going to track, tracking elements go into the first queue
- # which is the tracking queue, the rest of the plan goes into the next
- # queue (whatever that happens to be)
- if track:
- queues[0].enqueue(self._track_elements)
- queues[1].enqueue(plan)
- else:
- queues[0].enqueue(plan)
-
- self.session_elements = len(self._track_elements) + len(plan)
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # checkout()
- #
- # Checkout the pipeline target artifact to the specified directory
- #
- # Args:
- # directory (str): The directory to checkout the artifact to
- # force (bool): Force overwrite files which exist in `directory`
- # integrate (bool): Whether to run integration commands
- # hardlinks (bool): Whether checking out files hardlinked to
- # their artifacts is acceptable
- #
- def checkout(self, directory, force, integrate, hardlinks):
- # We only have one target in a checkout command
- target = self.targets[0]
-
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise PipelineError("Failed to create checkout directory: {}".format(e)) from e
-
- if not os.access(directory, os.W_OK):
- raise PipelineError("Directory {} not writable".format(directory))
-
- if not force and os.listdir(directory):
- raise PipelineError("Checkout directory is not empty: {}"
- .format(directory))
-
- # Stage deps into a temporary sandbox first
- try:
- with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
-
- # Copy or move the sandbox to the target directory
- sandbox_root = sandbox.get_directory()
- with target.timed_activity("Checking out files in {}".format(directory)):
- try:
- if hardlinks:
- self.checkout_hardlinks(sandbox_root, directory)
- else:
- utils.copy_files(sandbox_root, directory)
- except OSError as e:
- raise PipelineError("Failed to checkout files: {}".format(e)) from e
- except BstError as e:
- raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e),
- reason=e.reason) from e
-
- # Helper function for checkout()
- #
- def checkout_hardlinks(self, sandbox_root, directory):
- try:
- removed = utils.safe_remove(directory)
- except OSError as e:
- raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e
-
- if removed:
- # Try a simple rename of the sandbox root; if that
- # doesnt cut it, then do the regular link files code path
- try:
- os.rename(sandbox_root, directory)
- except OSError:
- os.makedirs(directory, exist_ok=True)
- utils.link_files(sandbox_root, directory)
- else:
- utils.link_files(sandbox_root, directory)
-
- # pull()
- #
- # Pulls elements from the pipeline
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to pull
- #
- def pull(self, scheduler, elements):
-
- if not self._artifacts.has_fetch_remotes():
- raise PipelineError("Not artifact caches available for pulling artifacts")
-
- plan = elements
- self._assert_consistent(plan)
- self.session_elements = len(plan)
-
- pull = PullQueue(scheduler)
- pull.enqueue(plan)
- queues = [pull]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # push()
- #
- # Pushes elements in the pipeline
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to push
- #
- def push(self, scheduler, elements):
-
- if not self._artifacts.has_push_remotes():
- raise PipelineError("No artifact caches available for pushing artifacts")
-
- plan = elements
- self._assert_consistent(plan)
- self.session_elements = len(plan)
-
- push = PushQueue(scheduler)
- push.enqueue(plan)
- queues = [push]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
# remove_elements():
#
# Internal function
@@ -560,63 +305,6 @@ class Pipeline():
# in before.
return [element for element in elements if element in visited]
- # source_bundle()
- #
- # Create a build bundle for the given artifact.
- #
- # Args:
- # directory (str): The directory to checkout the artifact to
- #
- def source_bundle(self, scheduler, dependencies, force,
- track_first, compression, directory):
-
- # source-bundle only supports one target
- target = self.targets[0]
-
- # Find the correct filename for the compression algorithm
- tar_location = os.path.join(directory, target.normal_name + ".tar")
- if compression != "none":
- tar_location += "." + compression
-
- # Attempt writing a file to generate a good error message
- # early
- #
- # FIXME: A bit hackish
- try:
- open(tar_location, mode="x")
- os.remove(tar_location)
- except IOError as e:
- raise PipelineError("Cannot write to {0}: {1}"
- .format(tar_location, e)) from e
-
- plan = list(dependencies)
- self.fetch(scheduler, plan)
-
- # We don't use the scheduler for this as it is almost entirely IO
- # bound.
-
- # Create a temporary directory to build the source tree in
- builddir = target._get_context().builddir
- prefix = "{}-".format(target.normal_name)
-
- with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
- source_directory = os.path.join(tempdir, 'source')
- try:
- os.makedirs(source_directory)
- except OSError as e:
- raise PipelineError("Failed to create directory: {}"
- .format(e)) from e
-
- # Any elements that don't implement _write_script
- # should not be included in the later stages.
- plan = [element for element in plan
- if self._write_element_script(source_directory, element)]
-
- self._write_element_sources(tempdir, plan)
- self._write_build_script(tempdir, plan)
- self._collect_sources(tempdir, tar_location,
- target.normal_name, compression)
-
#############################################################
# Private Methods #
#############################################################
@@ -794,50 +482,6 @@ class Pipeline():
self.context.message(
Message(None, message_type, message, **args))
- # Write the element build script to the given directory
- def _write_element_script(self, directory, element):
- try:
- element._write_script(directory)
- except ImplError:
- return False
- return True
-
- # Write all source elements to the given directory
- def _write_element_sources(self, directory, elements):
- for element in elements:
- source_dir = os.path.join(directory, "source")
- element_source_dir = os.path.join(source_dir, element.normal_name)
-
- element._stage_sources_at(element_source_dir)
-
- # Write a master build script to the sandbox
- def _write_build_script(self, directory, elements):
-
- module_string = ""
- for element in elements:
- module_string += shlex.quote(element.normal_name) + " "
-
- script_path = os.path.join(directory, "build.sh")
-
- with open(_site.build_all_template, "r") as f:
- script_template = f.read()
-
- with utils.save_file_atomic(script_path, "w") as script:
- script.write(script_template.format(modules=module_string))
-
- os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
-
- # Collect the sources in the given sandbox into a tarfile
- def _collect_sources(self, directory, tar_name, element_name, compression):
- with self.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
- if compression == "none":
- permissions = "w:"
- else:
- permissions = "w:" + compression
-
- with tarfile.open(tar_name, permissions) as tar:
- tar.add(directory, arcname=element_name)
-
# _Planner()
#
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
new file mode 100644
index 000000000..d80b19f34
--- /dev/null
+++ b/buildstream/_stream.py
@@ -0,0 +1,396 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+import os
+import stat
+import shlex
+import tarfile
+from tempfile import TemporaryDirectory
+
+from ._exceptions import PipelineError, ImplError, BstError
+from . import _site
+from . import utils
+from . import Scope, Consistency
+from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+
+
+# Stream()
+#
+# This is the main, toplevel calling interface in BuildStream core.
+#
+# Args:
+# context (Context): The Context object
+#
+class Stream():
+
+ def __init__(self, context, scheduler, pipeline):
+ self.session_elements = 0 # Number of elements to process in this session
+ self.total_elements = 0 # Number of total potential elements for this pipeline
+
+ self._context = context
+ self._scheduler = scheduler
+ self._pipeline = pipeline
+
+ self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
+
+ # track()
+ #
+ # Trackes all the sources of all the elements in the pipeline,
+ # i.e. all of the elements which the target somehow depends on.
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ #
+ # If no error is encountered while tracking, then the project files
+ # are rewritten inline.
+ #
+ def track(self, scheduler):
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ self.session_elements = len(self._pipeline._track_elements)
+
+ _, status = self._scheduler.run([track])
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # fetch()
+ #
+ # Fetches sources on the pipeline.
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # dependencies (list): List of elements to fetch
+ #
+ def fetch(self, scheduler, dependencies):
+ fetch_plan = dependencies
+
+ # Subtract the track elements from the fetch elements, they will be added separately
+ if self._pipeline._track_elements:
+ track_elements = set(self._pipeline._track_elements)
+ fetch_plan = [e for e in fetch_plan if e not in track_elements]
+
+ # Assert consistency for the fetch elements
+ self._pipeline._assert_consistent(fetch_plan)
+
+ # Filter out elements with cached sources, only from the fetch plan
+ # let the track plan resolve new refs.
+ cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
+ fetch_plan = [elt for elt in fetch_plan if elt not in cached]
+
+ self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+
+ fetch = FetchQueue(self._scheduler)
+ fetch.enqueue(fetch_plan)
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ queues = [track, fetch]
+ else:
+ queues = [fetch]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # build()
+ #
+ # Builds (assembles) elements in the pipeline.
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # build_all (bool): Whether to build all elements, or only those
+ # which are required to build the target.
+ #
+ def build(self, scheduler, *, build_all=False):
+
+ if build_all:
+ plan = self._pipeline.dependencies(Scope.ALL)
+ else:
+ plan = self._pipeline._plan(except_=False)
+
+ # We want to start the build queue with any elements that are
+ # not being tracked first
+ track_elements = set(self._pipeline._track_elements)
+ plan = [e for e in plan if e not in track_elements]
+
+ # Assert that we have a consistent pipeline now (elements in
+ # track_plan will be made consistent)
+ self._pipeline._assert_consistent(plan)
+
+ fetch = FetchQueue(self._scheduler, skip_cached=True)
+ build = BuildQueue(self._scheduler)
+ track = None
+ pull = None
+ push = None
+ queues = []
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ queues.append(track)
+ if self._pipeline._artifacts.has_fetch_remotes():
+ pull = PullQueue(self._scheduler)
+ queues.append(pull)
+ queues.append(fetch)
+ queues.append(build)
+ if self._pipeline._artifacts.has_push_remotes():
+ push = PushQueue(self._scheduler)
+ queues.append(push)
+
+ # If we're going to track, tracking elements go into the first queue
+ # which is the tracking queue, the rest of the plan goes into the next
+ # queue (whatever that happens to be)
+ if track:
+ queues[0].enqueue(self._pipeline._track_elements)
+ queues[1].enqueue(plan)
+ else:
+ queues[0].enqueue(plan)
+
+ self.session_elements = len(self._pipeline._track_elements) + len(plan)
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # checkout()
+ #
+ # Checkout the pipeline target artifact to the specified directory
+ #
+ # Args:
+ # directory (str): The directory to checkout the artifact to
+ # force (bool): Force overwrite files which exist in `directory`
+ # integrate (bool): Whether to run integration commands
+ # hardlinks (bool): Whether checking out files hardlinked to
+ # their artifacts is acceptable
+ #
+ def checkout(self, directory, force, integrate, hardlinks):
+ # We only have one target in a checkout command
+ target = self._pipeline.targets[0]
+
+ try:
+ os.makedirs(directory, exist_ok=True)
+ except OSError as e:
+ raise PipelineError("Failed to create checkout directory: {}".format(e)) from e
+
+ if not os.access(directory, os.W_OK):
+ raise PipelineError("Directory {} not writable".format(directory))
+
+ if not force and os.listdir(directory):
+ raise PipelineError("Checkout directory is not empty: {}"
+ .format(directory))
+
+ # Stage deps into a temporary sandbox first
+ try:
+ with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+
+ # Copy or move the sandbox to the target directory
+ sandbox_root = sandbox.get_directory()
+ with target.timed_activity("Checking out files in {}".format(directory)):
+ try:
+ if hardlinks:
+ self.checkout_hardlinks(sandbox_root, directory)
+ else:
+ utils.copy_files(sandbox_root, directory)
+ except OSError as e:
+ raise PipelineError("Failed to checkout files: {}".format(e)) from e
+ except BstError as e:
+ raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e),
+ reason=e.reason) from e
+
+ # Helper function for checkout()
+ #
+ def checkout_hardlinks(self, sandbox_root, directory):
+ try:
+ removed = utils.safe_remove(directory)
+ except OSError as e:
+ raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e
+
+ if removed:
+ # Try a simple rename of the sandbox root; if that
+ # doesnt cut it, then do the regular link files code path
+ try:
+ os.rename(sandbox_root, directory)
+ except OSError:
+ os.makedirs(directory, exist_ok=True)
+ utils.link_files(sandbox_root, directory)
+ else:
+ utils.link_files(sandbox_root, directory)
+
+ # pull()
+ #
+ # Pulls elements from the pipeline
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # elements (list): List of elements to pull
+ #
+ def pull(self, scheduler, elements):
+
+ if not self._pipeline._artifacts.has_fetch_remotes():
+ raise PipelineError("Not artifact caches available for pulling artifacts")
+
+ plan = elements
+ self._pipeline._assert_consistent(plan)
+ self._pipeline.session_elements = len(plan)
+
+ pull = PullQueue(self._scheduler)
+ pull.enqueue(plan)
+ queues = [pull]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # push()
+ #
+ # Pushes elements in the pipeline
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # elements (list): List of elements to push
+ #
+ def push(self, scheduler, elements):
+
+ if not self._pipeline._artifacts.has_push_remotes():
+ raise PipelineError("No artifact caches available for pushing artifacts")
+
+ plan = elements
+ self._pipeline._assert_consistent(plan)
+ self._pipeline.session_elements = len(plan)
+
+ push = PushQueue(self._scheduler)
+ push.enqueue(plan)
+ queues = [push]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # source_bundle()
+ #
+ # Create a build bundle for the given artifact.
+ #
+ # Args:
+ # directory (str): The directory to checkout the artifact to
+ #
+ def source_bundle(self, scheduler, dependencies, force,
+ track_first, compression, directory):
+
+ # source-bundle only supports one target
+ target = self._pipeline.targets[0]
+
+ # Find the correct filename for the compression algorithm
+ tar_location = os.path.join(directory, target.normal_name + ".tar")
+ if compression != "none":
+ tar_location += "." + compression
+
+ # Attempt writing a file to generate a good error message
+ # early
+ #
+ # FIXME: A bit hackish
+ try:
+ open(tar_location, mode="x")
+ os.remove(tar_location)
+ except IOError as e:
+ raise PipelineError("Cannot write to {0}: {1}"
+ .format(tar_location, e)) from e
+
+ plan = list(dependencies)
+ self.fetch(self._scheduler, plan)
+
+ # We don't use the scheduler for this as it is almost entirely IO
+ # bound.
+
+ # Create a temporary directory to build the source tree in
+ builddir = target._get_context().builddir
+ prefix = "{}-".format(target.normal_name)
+
+ with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
+ source_directory = os.path.join(tempdir, 'source')
+ try:
+ os.makedirs(source_directory)
+ except OSError as e:
+ raise PipelineError("Failed to create directory: {}"
+ .format(e)) from e
+
+ # Any elements that don't implement _write_script
+ # should not be included in the later stages.
+ plan = [element for element in plan
+ if self._write_element_script(source_directory, element)]
+
+ self._write_element_sources(tempdir, plan)
+ self._write_build_script(tempdir, plan)
+ self._collect_sources(tempdir, tar_location,
+ target.normal_name, compression)
+
+ #############################################################
+ # Private Methods #
+ #############################################################
+
+ # Write the element build script to the given directory
+ def _write_element_script(self, directory, element):
+ try:
+ element._write_script(directory)
+ except ImplError:
+ return False
+ return True
+
+ # Write all source elements to the given directory
+ def _write_element_sources(self, directory, elements):
+ for element in elements:
+ source_dir = os.path.join(directory, "source")
+ element_source_dir = os.path.join(source_dir, element.normal_name)
+
+ element._stage_sources_at(element_source_dir)
+
+ # Write a master build script to the sandbox
+ def _write_build_script(self, directory, elements):
+
+ module_string = ""
+ for element in elements:
+ module_string += shlex.quote(element.normal_name) + " "
+
+ script_path = os.path.join(directory, "build.sh")
+
+ with open(_site.build_all_template, "r") as f:
+ script_template = f.read()
+
+ with utils.save_file_atomic(script_path, "w") as script:
+ script.write(script_template.format(modules=module_string))
+
+ os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
+
+ # Collect the sources in the given sandbox into a tarfile
+ def _collect_sources(self, directory, tar_name, element_name, compression):
+ with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
+ if compression == "none":
+ permissions = "w:"
+ else:
+ permissions = "w:" + compression
+
+ with tarfile.open(tar_name, permissions) as tar:
+ tar.add(directory, arcname=element_name)