summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-29 15:20:09 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-08 03:59:38 +0900
commit2390c81411aee2019cec891ea0be5247e779bc2e (patch)
treebb065bef7e66507e65f0b7564a18ddf1002f44b9
parentbbb894bdd921f39e4440e74351b5f478f65555f3 (diff)
downloadbuildstream-2390c81411aee2019cec891ea0be5247e779bc2e.tar.gz
_stream.py: New Stream object, main calling interface for BuildStream core
This is the first part of the pipeline refactor, at this stage all calling interfaces remain the same, except that invocation of the scheduler has been moved from Pipline to Stream.
-rw-r--r--buildstream/_frontend/app.py13
-rw-r--r--buildstream/_frontend/cli.py16
-rw-r--r--buildstream/_frontend/status.py14
-rw-r--r--buildstream/_frontend/widget.py8
-rw-r--r--buildstream/_pipeline.py360
-rw-r--r--buildstream/_stream.py396
6 files changed, 427 insertions, 380 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 3bcb0d962..84c33a385 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,6 +39,7 @@ from .._context import Context
from .._project import Project
from .._exceptions import BstError, PipelineError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
+from .._stream import Stream
from .._pipeline import Pipeline, PipelineSelection
from .._scheduler import Scheduler
from .._profile import Topics, profile_start, profile_end
@@ -69,6 +70,7 @@ class App():
# Public members
#
self.context = None # The Context object
+ self.stream = None # The Stream object
self.project = None # The toplevel Project object
self.scheduler = None # The Scheduler
self.pipeline = None # The Pipeline
@@ -255,11 +257,12 @@ class App():
track_cross_junctions=False,
track_selection=PipelineSelection.ALL,
fetch_subprojects=False):
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
# Start with the early stage init, this enables logging right away
with self.partially_initialized(fetch_subprojects=fetch_subprojects):
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
+
# Mark the beginning of the session
if session_name:
self._message(MessageType.START, session_name)
@@ -280,7 +283,7 @@ class App():
# Create our status printer, only available in interactive
self._status = Status(self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
- self.pipeline, self.scheduler,
+ self.stream, self.pipeline, self.scheduler,
colors=self.colors)
# Initialize pipeline
@@ -293,6 +296,8 @@ class App():
except BstError as e:
self._error_exit(e, "Error initializing pipeline")
+ self.stream = Stream(self.context, self.scheduler, self.pipeline)
+
# Pipeline is loaded, now we can tell the logger about it
self.logger.size_request(self.pipeline)
@@ -476,7 +481,7 @@ class App():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
if not no_checkout or track_first:
- self.pipeline.fetch(self.scheduler, [target])
+ self.stream.fetch(self.scheduler, [target])
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise PipelineError("Could not stage uncached source. " +
@@ -751,7 +756,7 @@ class App():
#
def _print_summary(self):
click.echo("", err=True)
- self.logger.print_summary(self.pipeline, self.scheduler,
+ self.logger.print_summary(self.stream, self.scheduler,
self._main_options['log_file'],
styling=self.colors)
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 5f9e2751e..11a0ca2cc 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -243,7 +243,7 @@ def build(app, elements, all_, track_, track_save, track_all, track_except, trac
use_configured_remote_caches=True, track_elements=track_,
track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
- app.pipeline.build(app.scheduler, build_all=all_)
+ app.stream.build(app.scheduler, build_all=all_)
##################################################################
@@ -287,7 +287,7 @@ def fetch(app, elements, deps, track_, except_, track_cross_junctions):
track_cross_junctions=track_cross_junctions,
fetch_subprojects=True):
dependencies = app.pipeline.get_selection(deps)
- app.pipeline.fetch(app.scheduler, dependencies)
+ app.stream.fetch(app.scheduler, dependencies)
##################################################################
@@ -323,7 +323,7 @@ def track(app, elements, deps, except_, cross_junctions):
track_cross_junctions=cross_junctions,
track_selection=deps,
fetch_subprojects=True):
- app.pipeline.track(app.scheduler)
+ app.stream.track(app.scheduler)
##################################################################
@@ -354,7 +354,7 @@ def pull(app, elements, deps, remote):
with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None),
add_remote_cache=remote, fetch_subprojects=True):
to_pull = app.pipeline.get_selection(deps)
- app.pipeline.pull(app.scheduler, to_pull)
+ app.stream.pull(app.scheduler, to_pull)
##################################################################
@@ -385,7 +385,7 @@ def push(app, elements, deps, remote):
use_configured_remote_caches=(remote is None),
add_remote_cache=remote, fetch_subprojects=True):
to_push = app.pipeline.get_selection(deps)
- app.pipeline.push(app.scheduler, to_push)
+ app.stream.push(app.scheduler, to_push)
##################################################################
@@ -564,7 +564,7 @@ def checkout(app, element, directory, force, integrate, hardlinks):
"""Checkout a built artifact to the specified directory
"""
with app.initialized((element,)):
- app.pipeline.checkout(directory, force, integrate, hardlinks)
+ app.stream.checkout(directory, force, integrate, hardlinks)
##################################################################
@@ -592,8 +592,8 @@ def source_bundle(app, target, force, directory,
"""
with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None):
dependencies = app.pipeline.get_selection('all')
- app.pipeline.source_bundle(app.scheduler, dependencies, force, track_,
- compression, directory)
+ app.stream.source_bundle(app.scheduler, dependencies, force, track_,
+ compression, directory)
##################################################################
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 9bb2b644f..4f3eed0f5 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -37,6 +37,7 @@ from .widget import TimeCode
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
+# stream (Stream): The Stream
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
# colors (bool): Whether to print the ANSI color codes in the output
@@ -45,13 +46,12 @@ class Status():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler, colors=False):
+ stream, pipeline, scheduler, colors=False):
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
- self._pipeline = pipeline
self._scheduler = scheduler
self._jobs = []
self._last_lines = 0 # Number of status lines we last printed to console
@@ -60,7 +60,7 @@ class Status():
self._colors = colors
self._header = _StatusHeader(content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler)
+ stream, pipeline, scheduler)
self._term_width, _ = click.get_terminal_size()
self._alloc_lines = 0
@@ -246,6 +246,7 @@ class Status():
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
+# stream (Stream): The Stream
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
#
@@ -253,7 +254,7 @@ class _StatusHeader():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
- pipeline, scheduler):
+ stream, pipeline, scheduler):
#
# Public members
@@ -267,6 +268,7 @@ class _StatusHeader():
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
+ self._stream = stream
self._pipeline = pipeline
self._scheduler = scheduler
self._time_code = TimeCode(content_profile, format_profile)
@@ -276,8 +278,8 @@ class _StatusHeader():
size = 0
text = ''
- session = str(self._pipeline.session_elements)
- total = str(self._pipeline.total_elements)
+ session = str(self._stream.session_elements)
+ total = str(self._stream.total_elements)
# Format and calculate size for pipeline target and overall time code
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index b5942b91e..a72293b04 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -530,12 +530,12 @@ class LogLine(Widget):
# Print a summary of activities at the end of a session
#
# Args:
- # pipeline (Pipeline): The Pipeline
+ # stream (Stream): The Stream
# scheduler (Scheduler): The Scheduler
# log_file (file): An optional file handle for additional logging
# styling (bool): Whether to enable ansi escape codes in the output
#
- def print_summary(self, pipeline, scheduler, log_file, styling=False):
+ def print_summary(self, stream, scheduler, log_file, styling=False):
# Early silent return if there are no queues, can happen
# only in the case that the pipeline early returned due to
@@ -563,8 +563,8 @@ class LogLine(Widget):
text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
values = OrderedDict()
- values['Total'] = self.content_profile.fmt(str(pipeline.total_elements))
- values['Session'] = self.content_profile.fmt(str(pipeline.session_elements))
+ values['Total'] = self.content_profile.fmt(str(stream.total_elements))
+ values['Session'] = self.content_profile.fmt(str(stream.session_elements))
processed_maxlen = 1
skipped_maxlen = 1
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 682329e6c..52bae2e5c 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -19,28 +19,18 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
-import os
-import stat
-import shlex
-import tarfile
import itertools
from operator import itemgetter
-from tempfile import TemporaryDirectory
-from ._exceptions import PipelineError, ImplError, BstError
+from ._exceptions import PipelineError
from ._message import Message, MessageType
from ._loader import Loader
from .element import Element
-from . import Consistency
-from . import Scope
-from . import _site
-from . import utils
+from . import Scope, Consistency
from ._platform import Platform
from ._project import ProjectRefStorage
from ._artifactcache.artifactcache import ArtifactCacheSpec, configured_remote_artifact_cache_specs
-from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
-
# PipelineSelection()
#
@@ -101,8 +91,6 @@ class Pipeline():
self.context = context # The Context
self.project = project # The toplevel project
- self.session_elements = 0 # Number of elements to process in this session
- self.total_elements = 0 # Number of total potential elements for this pipeline
self.targets = [] # List of toplevel target Element objects
#
@@ -170,8 +158,6 @@ class Pipeline():
# Preflight directly, before ever interrogating caches or anything.
self._preflight()
- self.total_elements = len(list(self.dependencies(Scope.ALL)))
-
# Initialize remote artifact caches. We allow the commandline to override
# the user config in some cases (for example `bst push --remote=...`).
has_remote_caches = False
@@ -260,247 +246,6 @@ class Pipeline():
# Commands #
#############################################################
- # track()
- #
- # Trackes all the sources of all the elements in the pipeline,
- # i.e. all of the elements which the target somehow depends on.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- #
- # If no error is encountered while tracking, then the project files
- # are rewritten inline.
- #
- def track(self, scheduler):
- track = TrackQueue(scheduler)
- track.enqueue(self._track_elements)
- self.session_elements = len(self._track_elements)
-
- _, status = scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # fetch()
- #
- # Fetches sources on the pipeline.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # dependencies (list): List of elements to fetch
- #
- def fetch(self, scheduler, dependencies):
- fetch_plan = dependencies
-
- # Subtract the track elements from the fetch elements, they will be added separately
- if self._track_elements:
- track_elements = set(self._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
-
- # Assert consistency for the fetch elements
- self._assert_consistent(fetch_plan)
-
- # Filter out elements with cached sources, only from the fetch plan
- # let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._track_elements) + len(fetch_plan)
-
- fetch = FetchQueue(scheduler)
- fetch.enqueue(fetch_plan)
- if self._track_elements:
- track = TrackQueue(scheduler)
- track.enqueue(self._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # build()
- #
- # Builds (assembles) elements in the pipeline.
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # build_all (bool): Whether to build all elements, or only those
- # which are required to build the target.
- #
- def build(self, scheduler, *, build_all=False):
-
- if build_all:
- plan = self.dependencies(Scope.ALL)
- else:
- plan = self._plan(except_=False)
-
- # We want to start the build queue with any elements that are
- # not being tracked first
- track_elements = set(self._track_elements)
- plan = [e for e in plan if e not in track_elements]
-
- # Assert that we have a consistent pipeline now (elements in
- # track_plan will be made consistent)
- self._assert_consistent(plan)
-
- fetch = FetchQueue(scheduler, skip_cached=True)
- build = BuildQueue(scheduler)
- track = None
- pull = None
- push = None
- queues = []
- if self._track_elements:
- track = TrackQueue(scheduler)
- queues.append(track)
- if self._artifacts.has_fetch_remotes():
- pull = PullQueue(scheduler)
- queues.append(pull)
- queues.append(fetch)
- queues.append(build)
- if self._artifacts.has_push_remotes():
- push = PushQueue(scheduler)
- queues.append(push)
-
- # If we're going to track, tracking elements go into the first queue
- # which is the tracking queue, the rest of the plan goes into the next
- # queue (whatever that happens to be)
- if track:
- queues[0].enqueue(self._track_elements)
- queues[1].enqueue(plan)
- else:
- queues[0].enqueue(plan)
-
- self.session_elements = len(self._track_elements) + len(plan)
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # checkout()
- #
- # Checkout the pipeline target artifact to the specified directory
- #
- # Args:
- # directory (str): The directory to checkout the artifact to
- # force (bool): Force overwrite files which exist in `directory`
- # integrate (bool): Whether to run integration commands
- # hardlinks (bool): Whether checking out files hardlinked to
- # their artifacts is acceptable
- #
- def checkout(self, directory, force, integrate, hardlinks):
- # We only have one target in a checkout command
- target = self.targets[0]
-
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise PipelineError("Failed to create checkout directory: {}".format(e)) from e
-
- if not os.access(directory, os.W_OK):
- raise PipelineError("Directory {} not writable".format(directory))
-
- if not force and os.listdir(directory):
- raise PipelineError("Checkout directory is not empty: {}"
- .format(directory))
-
- # Stage deps into a temporary sandbox first
- try:
- with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
-
- # Copy or move the sandbox to the target directory
- sandbox_root = sandbox.get_directory()
- with target.timed_activity("Checking out files in {}".format(directory)):
- try:
- if hardlinks:
- self.checkout_hardlinks(sandbox_root, directory)
- else:
- utils.copy_files(sandbox_root, directory)
- except OSError as e:
- raise PipelineError("Failed to checkout files: {}".format(e)) from e
- except BstError as e:
- raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e),
- reason=e.reason) from e
-
- # Helper function for checkout()
- #
- def checkout_hardlinks(self, sandbox_root, directory):
- try:
- removed = utils.safe_remove(directory)
- except OSError as e:
- raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e
-
- if removed:
- # Try a simple rename of the sandbox root; if that
- # doesnt cut it, then do the regular link files code path
- try:
- os.rename(sandbox_root, directory)
- except OSError:
- os.makedirs(directory, exist_ok=True)
- utils.link_files(sandbox_root, directory)
- else:
- utils.link_files(sandbox_root, directory)
-
- # pull()
- #
- # Pulls elements from the pipeline
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to pull
- #
- def pull(self, scheduler, elements):
-
- if not self._artifacts.has_fetch_remotes():
- raise PipelineError("Not artifact caches available for pulling artifacts")
-
- plan = elements
- self._assert_consistent(plan)
- self.session_elements = len(plan)
-
- pull = PullQueue(scheduler)
- pull.enqueue(plan)
- queues = [pull]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
- # push()
- #
- # Pushes elements in the pipeline
- #
- # Args:
- # scheduler (Scheduler): The scheduler to run this pipeline on
- # elements (list): List of elements to push
- #
- def push(self, scheduler, elements):
-
- if not self._artifacts.has_push_remotes():
- raise PipelineError("No artifact caches available for pushing artifacts")
-
- plan = elements
- self._assert_consistent(plan)
- self.session_elements = len(plan)
-
- push = PushQueue(scheduler)
- push.enqueue(plan)
- queues = [push]
-
- _, status = scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise PipelineError()
- elif status == SchedStatus.TERMINATED:
- raise PipelineError(terminated=True)
-
# remove_elements():
#
# Internal function
@@ -560,63 +305,6 @@ class Pipeline():
# in before.
return [element for element in elements if element in visited]
- # source_bundle()
- #
- # Create a build bundle for the given artifact.
- #
- # Args:
- # directory (str): The directory to checkout the artifact to
- #
- def source_bundle(self, scheduler, dependencies, force,
- track_first, compression, directory):
-
- # source-bundle only supports one target
- target = self.targets[0]
-
- # Find the correct filename for the compression algorithm
- tar_location = os.path.join(directory, target.normal_name + ".tar")
- if compression != "none":
- tar_location += "." + compression
-
- # Attempt writing a file to generate a good error message
- # early
- #
- # FIXME: A bit hackish
- try:
- open(tar_location, mode="x")
- os.remove(tar_location)
- except IOError as e:
- raise PipelineError("Cannot write to {0}: {1}"
- .format(tar_location, e)) from e
-
- plan = list(dependencies)
- self.fetch(scheduler, plan)
-
- # We don't use the scheduler for this as it is almost entirely IO
- # bound.
-
- # Create a temporary directory to build the source tree in
- builddir = target._get_context().builddir
- prefix = "{}-".format(target.normal_name)
-
- with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
- source_directory = os.path.join(tempdir, 'source')
- try:
- os.makedirs(source_directory)
- except OSError as e:
- raise PipelineError("Failed to create directory: {}"
- .format(e)) from e
-
- # Any elements that don't implement _write_script
- # should not be included in the later stages.
- plan = [element for element in plan
- if self._write_element_script(source_directory, element)]
-
- self._write_element_sources(tempdir, plan)
- self._write_build_script(tempdir, plan)
- self._collect_sources(tempdir, tar_location,
- target.normal_name, compression)
-
#############################################################
# Private Methods #
#############################################################
@@ -794,50 +482,6 @@ class Pipeline():
self.context.message(
Message(None, message_type, message, **args))
- # Write the element build script to the given directory
- def _write_element_script(self, directory, element):
- try:
- element._write_script(directory)
- except ImplError:
- return False
- return True
-
- # Write all source elements to the given directory
- def _write_element_sources(self, directory, elements):
- for element in elements:
- source_dir = os.path.join(directory, "source")
- element_source_dir = os.path.join(source_dir, element.normal_name)
-
- element._stage_sources_at(element_source_dir)
-
- # Write a master build script to the sandbox
- def _write_build_script(self, directory, elements):
-
- module_string = ""
- for element in elements:
- module_string += shlex.quote(element.normal_name) + " "
-
- script_path = os.path.join(directory, "build.sh")
-
- with open(_site.build_all_template, "r") as f:
- script_template = f.read()
-
- with utils.save_file_atomic(script_path, "w") as script:
- script.write(script_template.format(modules=module_string))
-
- os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
-
- # Collect the sources in the given sandbox into a tarfile
- def _collect_sources(self, directory, tar_name, element_name, compression):
- with self.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
- if compression == "none":
- permissions = "w:"
- else:
- permissions = "w:" + compression
-
- with tarfile.open(tar_name, permissions) as tar:
- tar.add(directory, arcname=element_name)
-
# _Planner()
#
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
new file mode 100644
index 000000000..d80b19f34
--- /dev/null
+++ b/buildstream/_stream.py
@@ -0,0 +1,396 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+import os
+import stat
+import shlex
+import tarfile
+from tempfile import TemporaryDirectory
+
+from ._exceptions import PipelineError, ImplError, BstError
+from . import _site
+from . import utils
+from . import Scope, Consistency
+from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+
+
+# Stream()
+#
+# This is the main, toplevel calling interface in BuildStream core.
+#
+# Args:
+# context (Context): The Context object
+#
+class Stream():
+
+ def __init__(self, context, scheduler, pipeline):
+ self.session_elements = 0 # Number of elements to process in this session
+ self.total_elements = 0 # Number of total potential elements for this pipeline
+
+ self._context = context
+ self._scheduler = scheduler
+ self._pipeline = pipeline
+
+ self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
+
+ # track()
+ #
+ # Trackes all the sources of all the elements in the pipeline,
+ # i.e. all of the elements which the target somehow depends on.
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ #
+ # If no error is encountered while tracking, then the project files
+ # are rewritten inline.
+ #
+ def track(self, scheduler):
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ self.session_elements = len(self._pipeline._track_elements)
+
+ _, status = self._scheduler.run([track])
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # fetch()
+ #
+ # Fetches sources on the pipeline.
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # dependencies (list): List of elements to fetch
+ #
+ def fetch(self, scheduler, dependencies):
+ fetch_plan = dependencies
+
+ # Subtract the track elements from the fetch elements, they will be added separately
+ if self._pipeline._track_elements:
+ track_elements = set(self._pipeline._track_elements)
+ fetch_plan = [e for e in fetch_plan if e not in track_elements]
+
+ # Assert consistency for the fetch elements
+ self._pipeline._assert_consistent(fetch_plan)
+
+ # Filter out elements with cached sources, only from the fetch plan
+ # let the track plan resolve new refs.
+ cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
+ fetch_plan = [elt for elt in fetch_plan if elt not in cached]
+
+ self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+
+ fetch = FetchQueue(self._scheduler)
+ fetch.enqueue(fetch_plan)
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ track.enqueue(self._pipeline._track_elements)
+ queues = [track, fetch]
+ else:
+ queues = [fetch]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # build()
+ #
+ # Builds (assembles) elements in the pipeline.
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # build_all (bool): Whether to build all elements, or only those
+ # which are required to build the target.
+ #
+ def build(self, scheduler, *, build_all=False):
+
+ if build_all:
+ plan = self._pipeline.dependencies(Scope.ALL)
+ else:
+ plan = self._pipeline._plan(except_=False)
+
+ # We want to start the build queue with any elements that are
+ # not being tracked first
+ track_elements = set(self._pipeline._track_elements)
+ plan = [e for e in plan if e not in track_elements]
+
+ # Assert that we have a consistent pipeline now (elements in
+ # track_plan will be made consistent)
+ self._pipeline._assert_consistent(plan)
+
+ fetch = FetchQueue(self._scheduler, skip_cached=True)
+ build = BuildQueue(self._scheduler)
+ track = None
+ pull = None
+ push = None
+ queues = []
+ if self._pipeline._track_elements:
+ track = TrackQueue(self._scheduler)
+ queues.append(track)
+ if self._pipeline._artifacts.has_fetch_remotes():
+ pull = PullQueue(self._scheduler)
+ queues.append(pull)
+ queues.append(fetch)
+ queues.append(build)
+ if self._pipeline._artifacts.has_push_remotes():
+ push = PushQueue(self._scheduler)
+ queues.append(push)
+
+ # If we're going to track, tracking elements go into the first queue
+ # which is the tracking queue, the rest of the plan goes into the next
+ # queue (whatever that happens to be)
+ if track:
+ queues[0].enqueue(self._pipeline._track_elements)
+ queues[1].enqueue(plan)
+ else:
+ queues[0].enqueue(plan)
+
+ self.session_elements = len(self._pipeline._track_elements) + len(plan)
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # checkout()
+ #
+ # Checkout the pipeline target artifact to the specified directory
+ #
+ # Args:
+ # directory (str): The directory to checkout the artifact to
+ # force (bool): Force overwrite files which exist in `directory`
+ # integrate (bool): Whether to run integration commands
+ # hardlinks (bool): Whether checking out files hardlinked to
+ # their artifacts is acceptable
+ #
+ def checkout(self, directory, force, integrate, hardlinks):
+ # We only have one target in a checkout command
+ target = self._pipeline.targets[0]
+
+ try:
+ os.makedirs(directory, exist_ok=True)
+ except OSError as e:
+ raise PipelineError("Failed to create checkout directory: {}".format(e)) from e
+
+ if not os.access(directory, os.W_OK):
+ raise PipelineError("Directory {} not writable".format(directory))
+
+ if not force and os.listdir(directory):
+ raise PipelineError("Checkout directory is not empty: {}"
+ .format(directory))
+
+ # Stage deps into a temporary sandbox first
+ try:
+ with target._prepare_sandbox(Scope.RUN, None, integrate=integrate) as sandbox:
+
+ # Copy or move the sandbox to the target directory
+ sandbox_root = sandbox.get_directory()
+ with target.timed_activity("Checking out files in {}".format(directory)):
+ try:
+ if hardlinks:
+ self.checkout_hardlinks(sandbox_root, directory)
+ else:
+ utils.copy_files(sandbox_root, directory)
+ except OSError as e:
+ raise PipelineError("Failed to checkout files: {}".format(e)) from e
+ except BstError as e:
+ raise PipelineError("Error while staging dependencies into a sandbox: {}".format(e),
+ reason=e.reason) from e
+
+ # Helper function for checkout()
+ #
+ def checkout_hardlinks(self, sandbox_root, directory):
+ try:
+ removed = utils.safe_remove(directory)
+ except OSError as e:
+ raise PipelineError("Failed to remove checkout directory: {}".format(e)) from e
+
+ if removed:
+ # Try a simple rename of the sandbox root; if that
+ # doesnt cut it, then do the regular link files code path
+ try:
+ os.rename(sandbox_root, directory)
+ except OSError:
+ os.makedirs(directory, exist_ok=True)
+ utils.link_files(sandbox_root, directory)
+ else:
+ utils.link_files(sandbox_root, directory)
+
+ # pull()
+ #
+ # Pulls elements from the pipeline
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # elements (list): List of elements to pull
+ #
+ def pull(self, scheduler, elements):
+
+ if not self._pipeline._artifacts.has_fetch_remotes():
+ raise PipelineError("Not artifact caches available for pulling artifacts")
+
+ plan = elements
+ self._pipeline._assert_consistent(plan)
+ self._pipeline.session_elements = len(plan)
+
+ pull = PullQueue(self._scheduler)
+ pull.enqueue(plan)
+ queues = [pull]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # push()
+ #
+ # Pushes elements in the pipeline
+ #
+ # Args:
+ # scheduler (Scheduler): The scheduler to run this pipeline on
+ # elements (list): List of elements to push
+ #
+ def push(self, scheduler, elements):
+
+ if not self._pipeline._artifacts.has_push_remotes():
+ raise PipelineError("No artifact caches available for pushing artifacts")
+
+ plan = elements
+ self._pipeline._assert_consistent(plan)
+ self._pipeline.session_elements = len(plan)
+
+ push = PushQueue(self._scheduler)
+ push.enqueue(plan)
+ queues = [push]
+
+ _, status = self._scheduler.run(queues)
+ if status == SchedStatus.ERROR:
+ raise PipelineError()
+ elif status == SchedStatus.TERMINATED:
+ raise PipelineError(terminated=True)
+
+ # source_bundle()
+ #
+ # Create a build bundle for the given artifact.
+ #
+ # Args:
+ # directory (str): The directory to checkout the artifact to
+ #
+ def source_bundle(self, scheduler, dependencies, force,
+ track_first, compression, directory):
+
+ # source-bundle only supports one target
+ target = self._pipeline.targets[0]
+
+ # Find the correct filename for the compression algorithm
+ tar_location = os.path.join(directory, target.normal_name + ".tar")
+ if compression != "none":
+ tar_location += "." + compression
+
+ # Attempt writing a file to generate a good error message
+ # early
+ #
+ # FIXME: A bit hackish
+ try:
+ open(tar_location, mode="x")
+ os.remove(tar_location)
+ except IOError as e:
+ raise PipelineError("Cannot write to {0}: {1}"
+ .format(tar_location, e)) from e
+
+ plan = list(dependencies)
+ self.fetch(self._scheduler, plan)
+
+ # We don't use the scheduler for this as it is almost entirely IO
+ # bound.
+
+ # Create a temporary directory to build the source tree in
+ builddir = target._get_context().builddir
+ prefix = "{}-".format(target.normal_name)
+
+ with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
+ source_directory = os.path.join(tempdir, 'source')
+ try:
+ os.makedirs(source_directory)
+ except OSError as e:
+ raise PipelineError("Failed to create directory: {}"
+ .format(e)) from e
+
+ # Any elements that don't implement _write_script
+ # should not be included in the later stages.
+ plan = [element for element in plan
+ if self._write_element_script(source_directory, element)]
+
+ self._write_element_sources(tempdir, plan)
+ self._write_build_script(tempdir, plan)
+ self._collect_sources(tempdir, tar_location,
+ target.normal_name, compression)
+
+ #############################################################
+ # Private Methods #
+ #############################################################
+
+ # Write the element build script to the given directory
+ def _write_element_script(self, directory, element):
+ try:
+ element._write_script(directory)
+ except ImplError:
+ return False
+ return True
+
+ # Write all source elements to the given directory
+ def _write_element_sources(self, directory, elements):
+ for element in elements:
+ source_dir = os.path.join(directory, "source")
+ element_source_dir = os.path.join(source_dir, element.normal_name)
+
+ element._stage_sources_at(element_source_dir)
+
+ # Write a master build script to the sandbox
+ def _write_build_script(self, directory, elements):
+
+ module_string = ""
+ for element in elements:
+ module_string += shlex.quote(element.normal_name) + " "
+
+ script_path = os.path.join(directory, "build.sh")
+
+ with open(_site.build_all_template, "r") as f:
+ script_template = f.read()
+
+ with utils.save_file_atomic(script_path, "w") as script:
+ script.write(script_template.format(modules=module_string))
+
+ os.chmod(script_path, stat.S_IEXEC | stat.S_IREAD)
+
+ # Collect the sources in the given sandbox into a tarfile
+ def _collect_sources(self, directory, tar_name, element_name, compression):
+ with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
+ if compression == "none":
+ permissions = "w:"
+ else:
+ permissions = "w:" + compression
+
+ with tarfile.open(tar_name, permissions) as tar:
+ tar.add(directory, arcname=element_name)