summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-04-15 08:45:34 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-04-15 08:45:34 +0000
commit4aafa38e0d3c73b2fb2663984ce170e7d7ed8e95 (patch)
treecc946f5b1d045d719d6cbbc886f3b288ad378e97
parentb3817226286a0c60b7ca955686b767bc40fb1051 (diff)
parent2db11763d50b2295ab49b44c5ed0adee38ad67fe (diff)
downloadbuildstream-4aafa38e0d3c73b2fb2663984ce170e7d7ed8e95.tar.gz
Merge branch 'tristan/platform-cache-quota-1.2' into 'bst-1.2'
Cache quote related backports See merge request BuildStream/buildstream!1288
-rw-r--r--buildstream/_artifactcache/__init__.py1
-rw-r--r--buildstream/_artifactcache/artifactcache.py167
-rw-r--r--buildstream/_artifactcache/cascache.py13
-rw-r--r--buildstream/_artifactcache/casserver.py12
-rw-r--r--buildstream/_context.py21
-rw-r--r--buildstream/_frontend/app.py9
-rw-r--r--buildstream/_frontend/status.py42
-rw-r--r--buildstream/_frontend/widget.py1
-rw-r--r--buildstream/_loader/loader.py4
-rw-r--r--buildstream/_platform/linux.py32
-rw-r--r--buildstream/_platform/platform.py28
-rw-r--r--buildstream/_platform/unix.py20
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py5
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py20
-rw-r--r--buildstream/_scheduler/jobs/job.py69
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py5
-rw-r--r--buildstream/_scheduler/scheduler.py6
-rw-r--r--buildstream/_stream.py4
-rw-r--r--buildstream/element.py18
-rw-r--r--buildstream/utils.py24
-rw-r--r--tests/artifactcache/cache_size.py28
-rw-r--r--tests/artifactcache/expiry.py65
-rw-r--r--tests/testutils/artifactshare.py12
23 files changed, 466 insertions, 140 deletions
diff --git a/buildstream/_artifactcache/__init__.py b/buildstream/_artifactcache/__init__.py
index fad483a57..76435e06f 100644
--- a/buildstream/_artifactcache/__init__.py
+++ b/buildstream/_artifactcache/__init__.py
@@ -18,3 +18,4 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
from .artifactcache import ArtifactCache, ArtifactCacheSpec, CACHE_SIZE_FILE
+from .artifactcache import ArtifactCacheUsage
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 965d6e132..0beab5537 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -83,6 +83,39 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push server_cert cl
ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
+# ArtifactCacheUsage
+#
+# A simple object to report the current artifact cache
+# usage details.
+#
+# Note that this uses the user configured cache quota
+# rather than the internal quota with protective headroom
+# removed, to provide a more sensible value to display to
+# the user.
+#
+# Args:
+# artifacts (ArtifactCache): The artifact cache to get the status of
+#
+class ArtifactCacheUsage():
+
+ def __init__(self, artifacts):
+ context = artifacts.context
+ self.quota_config = context.config_cache_quota # Configured quota
+ self.quota_size = artifacts._cache_quota_original # Resolved cache quota in bytes
+ self.used_size = artifacts.get_cache_size() # Size used by artifacts in bytes
+ self.used_percent = 0 # Percentage of the quota used
+ if self.quota_size is not None:
+ self.used_percent = int(self.used_size * 100 / self.quota_size)
+
+ # Formattable into a human readable string
+ #
+ def __str__(self):
+ return "{} / {} ({}%)" \
+ .format(utils._pretty_size(self.used_size, dec_places=1),
+ self.quota_config,
+ self.used_percent)
+
+
# An ArtifactCache manages artifacts.
#
# Args:
@@ -100,6 +133,7 @@ class ArtifactCache():
self._required_elements = set() # The elements required for this session
self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota
+ self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_lower_threshold = None # The target cache size for a cleanup
os.makedirs(self.extractdir, exist_ok=True)
@@ -242,11 +276,33 @@ class ArtifactCache():
#
# Clean the artifact cache as much as possible.
#
+ # Args:
+ # progress (callable): A callback to call when a ref is removed
+ #
# Returns:
# (int): The size of the cache after having cleaned up
#
- def clean(self):
+ def clean(self, progress=None):
artifacts = self.list_artifacts()
+ context = self.context
+
+ # Some accumulative statistics
+ removed_ref_count = 0
+ space_saved = 0
+
+ # Start off with an announcement with as much info as possible
+ volume_size, volume_avail = self._get_cache_volume_size()
+ self._message(MessageType.STATUS, "Starting cache cleanup",
+ detail=("Elements required by the current build plan: {}\n" +
+ "User specified quota: {} ({})\n" +
+ "Cache usage: {}\n" +
+ "Cache volume: {} total, {} available")
+ .format(len(self._required_elements),
+ context.config_cache_quota,
+ utils._pretty_size(self._cache_quota_original, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ utils._pretty_size(volume_size, dec_places=2),
+ utils._pretty_size(volume_avail, dec_places=2)))
# Build a set of the cache keys which are required
# based on the required elements at cleanup time
@@ -271,11 +327,18 @@ class ArtifactCache():
# can't remove them, we have to abort the build.
#
# FIXME: Asking the user what to do may be neater
+ #
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
'buildstream.conf')
- detail = ("There is not enough space to complete the build.\n"
- "Please increase the cache-quota in {}."
- .format(self.context.config_origin or default_conf))
+ detail = ("Aborted after removing {} refs and saving {} disk space.\n"
+ "The remaining {} in the cache is required by the {} elements in your build plan\n\n"
+ "There is not enough space to complete the build.\n"
+ "Please increase the cache-quota in {} and/or make more disk space."
+ .format(removed_ref_count,
+ utils._pretty_size(space_saved, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ len(self._required_elements),
+ (context.config_origin or default_conf)))
if self.has_quota_exceeded():
raise ArtifactError("Cache too full. Aborting.",
@@ -290,10 +353,33 @@ class ArtifactCache():
# Remove the actual artifact, if it's not required.
size = self.remove(to_remove)
+ removed_ref_count += 1
+ space_saved += size
+
+ self._message(MessageType.STATUS,
+ "Freed {: <7} {}".format(
+ utils._pretty_size(size, dec_places=2),
+ to_remove))
+
# Remove the size from the removed size
self.set_cache_size(self._cache_size - size)
- # This should be O(1) if implemented correctly
+ # User callback
+ #
+ # Currently this process is fairly slow, but we should
+ # think about throttling this progress() callback if this
+ # becomes too intense.
+ if progress:
+ progress()
+
+ # Informational message about the side effects of the cleanup
+ self._message(MessageType.INFO, "Cleanup completed",
+ detail=("Removed {} refs and saving {} disk space.\n" +
+ "Cache usage is now: {}")
+ .format(removed_ref_count,
+ utils._pretty_size(space_saved, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2)))
+
return self.get_cache_size()
# compute_cache_size()
@@ -305,7 +391,14 @@ class ArtifactCache():
# (int): The size of the artifact cache.
#
def compute_cache_size(self):
- self._cache_size = self.calculate_cache_size()
+ old_cache_size = self._cache_size
+ new_cache_size = self.calculate_cache_size()
+
+ if old_cache_size != new_cache_size:
+ self._cache_size = new_cache_size
+
+ usage = ArtifactCacheUsage(self)
+ self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage))
return self._cache_size
@@ -333,7 +426,7 @@ class ArtifactCache():
# it is greater than the actual cache size.
#
# Returns:
- # (int) An approximation of the artifact cache size.
+ # (int) An approximation of the artifact cache size, in bytes.
#
def get_cache_size(self):
@@ -378,6 +471,13 @@ class ArtifactCache():
# Abstract methods for subclasses to implement #
################################################
+ # preflight():
+ #
+ # Preflight check.
+ #
+ def preflight(self):
+ pass
+
# update_atime()
#
# Update the atime of an artifact.
@@ -667,21 +767,16 @@ class ArtifactCache():
else:
headroom = 2e9
- artifactdir_volume = self.context.artifactdir
- while not os.path.exists(artifactdir_volume):
- artifactdir_volume = os.path.dirname(artifactdir_volume)
-
try:
- cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume)
+ cache_quota = utils._parse_size(self.context.config_cache_quota,
+ self.context.artifactdir)
except utils.UtilError as e:
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}\nPlease specify the value in bytes or as a % of full disk space.\n"
"\nValid values are, for example: 800M 10G 1T 50%\n"
.format(str(e))) from e
- stat = os.statvfs(artifactdir_volume)
- available_space = (stat.f_bsize * stat.f_bavail)
-
+ total_size, available_space = self._get_cache_volume_size()
cache_size = self.get_cache_size()
# Ensure system has enough storage for the cache_quota
@@ -697,15 +792,22 @@ class ArtifactCache():
"Invalid cache quota ({}): ".format(utils._pretty_size(cache_quota)) +
"BuildStream requires a minimum cache quota of 2G.")
elif cache_quota > cache_size + available_space: # Check maximum
- raise LoadError(LoadErrorReason.INVALID_DATA,
- ("Your system does not have enough available " +
- "space to support the cache quota specified.\n" +
- "You currently have:\n" +
- "- {used} of cache in use at {local_cache_path}\n" +
- "- {available} of available system storage").format(
- used=utils._pretty_size(cache_size),
- local_cache_path=self.context.artifactdir,
- available=utils._pretty_size(available_space)))
+ if '%' in self.context.config_cache_quota:
+ available = (available_space / total_size) * 100
+ available = '{}% of total disk space'.format(round(available, 1))
+ else:
+ available = utils._pretty_size(available_space)
+
+ raise ArtifactError("Your system does not have enough available " +
+ "space to support the cache quota specified.",
+ detail=("You have specified a quota of {quota} total disk space.\n" +
+ "The filesystem containing {local_cache_path} only " +
+ "has {available_size} available.")
+ .format(
+ quota=self.context.config_cache_quota,
+ local_cache_path=self.context.artifactdir,
+ available_size=available),
+ reason='insufficient-storage-for-quota')
# Place a slight headroom (2e9 (2GB) on the cache_quota) into
# cache_quota to try and avoid exceptions.
@@ -714,9 +816,26 @@ class ArtifactCache():
# if we end up writing more than 2G, but hey, this stuff is
# already really fuzzy.
#
+ self._cache_quota_original = cache_quota
self._cache_quota = cache_quota - headroom
self._cache_lower_threshold = self._cache_quota / 2
+ # _get_cache_volume_size()
+ #
+ # Get the available space and total space for the volume on
+ # which the artifact cache is located.
+ #
+ # Returns:
+ # (int): The total number of bytes on the volume
+ # (int): The number of available bytes on the volume
+ #
+ # NOTE: We use this stub to allow the test cases
+ # to override what an artifact cache thinks
+ # about it's disk size and available bytes.
+ #
+ def _get_cache_volume_size(self):
+ return utils._get_volume_size(self.context.artifactdir)
+
# _configured_remote_artifact_cache_specs():
#
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 20fc9847d..9bad0df5e 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -61,7 +61,6 @@ class BlobNotFound(ArtifactError):
#
# Args:
# context (Context): The BuildStream context
-# enable_push (bool): Whether pushing is allowed by the platform
#
# Pushing is explicitly disabled by the platform in some cases,
# like when we are falling back to functioning without using
@@ -69,7 +68,7 @@ class BlobNotFound(ArtifactError):
#
class CASCache(ArtifactCache):
- def __init__(self, context, *, enable_push=True):
+ def __init__(self, context):
super().__init__(context)
self.casdir = os.path.join(context.artifactdir, 'cas')
@@ -78,8 +77,6 @@ class CASCache(ArtifactCache):
self._calculate_cache_quota()
- self._enable_push = enable_push
-
# Per-project list of _CASRemote instances.
self._remotes = {}
@@ -90,6 +87,12 @@ class CASCache(ArtifactCache):
# Implementation of abstract methods #
################################################
+ def preflight(self):
+ if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or
+ not os.path.isdir(os.path.join(self.casdir, 'objects'))):
+ raise ArtifactError("CAS repository check failed for '{}'"
+ .format(self.casdir))
+
def contains(self, element, key):
refpath = self._refpath(self.get_artifact_fullname(element, key))
@@ -221,7 +224,7 @@ class CASCache(ArtifactCache):
return bool(remotes_for_project)
def has_push_remotes(self, *, element=None):
- if not self._has_push_remotes or not self._enable_push:
+ if not self._has_push_remotes:
# No project has push remotes
return False
elif element is None:
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index e18689c78..4a9d5191a 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -37,8 +37,6 @@ from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .._exceptions import ArtifactError
from .._context import Context
-from .cascache import CASCache
-
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
@@ -50,6 +48,13 @@ class ArtifactTooLargeException(Exception):
pass
+# We need a message handler because this will own an ArtifactCache
+# which can in turn fire messages.
+def message_handler(message, context):
+ logging.info(message.message)
+ logging.info(message.detail)
+
+
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -63,8 +68,9 @@ def create_server(repo, *, enable_push,
min_head_size=int(2e9)):
context = Context()
context.artifactdir = os.path.abspath(repo)
+ context.set_message_handler(message_handler)
- artifactcache = CASCache(context)
+ artifactcache = context.artifactcache
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
diff --git a/buildstream/_context.py b/buildstream/_context.py
index 8dde091d3..997b82959 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -29,7 +29,8 @@ from . import _yaml
from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
-from ._artifactcache import ArtifactCache
+from ._artifactcache import ArtifactCache, ArtifactCacheUsage
+from ._artifactcache.cascache import CASCache
from ._workspaces import Workspaces
from .plugin import Plugin
@@ -113,6 +114,7 @@ class Context():
self._cache_key = None
self._message_handler = None
self._message_depth = deque()
+ self._artifactcache = None
self._projects = []
self._project_overrides = {}
self._workspaces = None
@@ -228,6 +230,23 @@ class Context():
"{}: on-error should be one of: {}".format(
provenance, ", ".join(valid_actions)))
+ @property
+ def artifactcache(self):
+ if not self._artifactcache:
+ self._artifactcache = CASCache(self)
+
+ return self._artifactcache
+
+ # get_artifact_cache_usage()
+ #
+ # Fetches the current usage of the artifact cache
+ #
+ # Returns:
+ # (ArtifactCacheUsage): The current status
+ #
+ def get_artifact_cache_usage(self):
+ return ArtifactCacheUsage(self.artifactcache)
+
# add_project():
#
# Add a project to the context.
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 312f11d0c..f48d9cbe1 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -199,7 +199,7 @@ class App():
if option_value is not None:
setattr(self.context, context_attr, option_value)
try:
- Platform.create_instance(self.context)
+ Platform.get_platform()
except BstError as e:
self._error_exit(e, "Error instantiating platform")
@@ -215,6 +215,13 @@ class App():
# Propagate pipeline feedback to the user
self.context.set_message_handler(self._message_handler)
+ # Preflight the artifact cache after initializing logging,
+ # this can cause messages to be emitted.
+ try:
+ self.context.artifactcache.preflight()
+ except BstError as e:
+ self._error_exit(e, "Error instantiating artifact cache")
+
#
# Load the Project
#
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index fd1a5acf1..9ec7d75e0 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -353,13 +353,17 @@ class _StatusHeader():
def render(self, line_length, elapsed):
project = self._context.get_toplevel_project()
line_length = max(line_length, 80)
- size = 0
- text = ''
+ #
+ # Line 1: Session time, project name, session / total elements
+ #
+ # ========= 00:00:00 project-name (143/387) =========
+ #
session = str(len(self._stream.session_elements))
total = str(len(self._stream.total_elements))
- # Format and calculate size for target and overall time code
+ size = 0
+ text = ''
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
size += 8 # Size of time code
size += len(project.name) + 1
@@ -372,6 +376,12 @@ class _StatusHeader():
self._format_profile.fmt(')')
line1 = self._centered(text, size, line_length, '=')
+
+ #
+ # Line 2: Dynamic list of queue status reports
+ #
+ # (Fetched:0 117 0)→ (Built:4 0 0)
+ #
size = 0
text = ''
@@ -389,10 +399,28 @@ class _StatusHeader():
line2 = self._centered(text, size, line_length, ' ')
- size = 24
- text = self._format_profile.fmt("~~~~~ ") + \
- self._content_profile.fmt('Active Tasks') + \
- self._format_profile.fmt(" ~~~~~")
+ #
+ # Line 3: Cache usage percentage report
+ #
+ # ~~~~~~ cache: 69% ~~~~~~
+ #
+ usage = self._context.get_artifact_cache_usage()
+ usage_percent = '{}%'.format(usage.used_percent)
+
+ size = 21
+ size += len(usage_percent)
+ if usage.used_percent >= 95:
+ formatted_usage_percent = self._error_profile.fmt(usage_percent)
+ elif usage.used_percent >= 80:
+ formatted_usage_percent = self._content_profile.fmt(usage_percent)
+ else:
+ formatted_usage_percent = self._success_profile.fmt(usage_percent)
+
+ text = self._format_profile.fmt("~~~~~~ ") + \
+ self._content_profile.fmt('cache') + \
+ self._format_profile.fmt(': ') + \
+ formatted_usage_percent + \
+ self._format_profile.fmt(' ~~~~~~')
line3 = self._centered(text, size, line_length, ' ')
return line1 + '\n' + line2 + '\n' + line3
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index 3a41e1052..a772f3248 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -452,6 +452,7 @@ class LogLine(Widget):
values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S')
values["Project"] = "{} ({})".format(project.name, project.directory)
values["Targets"] = ", ".join([t.name for t in stream.targets])
+ values["Cache Usage"] = "{}".format(context.get_artifact_cache_usage())
text += self._format_values(values)
# User configurations
diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py
index eec60b193..1d7476776 100644
--- a/buildstream/_loader/loader.py
+++ b/buildstream/_loader/loader.py
@@ -28,7 +28,6 @@ from .. import Consistency
from .. import _yaml
from ..element import Element
from .._profile import Topics, profile_start, profile_end
-from .._platform import Platform
from .._includes import Includes
from .types import Symbol, Dependency
@@ -533,8 +532,7 @@ class Loader():
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: Expected junction but element kind is {}".format(filename, meta_element.kind))
- platform = Platform.get_platform()
- element = Element._new_from_meta(meta_element, platform.artifactcache)
+ element = Element._new_from_meta(meta_element, self._context.artifactcache)
element._preflight()
for source in element.sources():
diff --git a/buildstream/_platform/linux.py b/buildstream/_platform/linux.py
index a5fd0d687..7af1a2283 100644
--- a/buildstream/_platform/linux.py
+++ b/buildstream/_platform/linux.py
@@ -17,11 +17,11 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
+import os
import subprocess
from .. import _site
from .. import utils
-from .._artifactcache.cascache import CASCache
from .._message import Message, MessageType
from ..sandbox import SandboxBwrap
@@ -30,17 +30,15 @@ from . import Platform
class Linux(Platform):
- def __init__(self, context):
+ def __init__(self):
- super().__init__(context)
+ super().__init__()
- self._die_with_parent_available = _site.check_bwrap_version(0, 1, 8)
- self._user_ns_available = self._check_user_ns_available(context)
- self._artifact_cache = CASCache(context, enable_push=self._user_ns_available)
+ self._uid = os.geteuid()
+ self._gid = os.getegid()
- @property
- def artifactcache(self):
- return self._artifact_cache
+ self._die_with_parent_available = _site.check_bwrap_version(0, 1, 8)
+ self._user_ns_available = self._check_user_ns_available()
def create_sandbox(self, *args, **kwargs):
# Inform the bubblewrap sandbox as to whether it can use user namespaces or not
@@ -48,10 +46,19 @@ class Linux(Platform):
kwargs['die_with_parent_available'] = self._die_with_parent_available
return SandboxBwrap(*args, **kwargs)
+ def check_sandbox_config(self, config):
+ if self._user_ns_available:
+ # User namespace support allows arbitrary build UID/GID settings.
+ return True
+ else:
+ # Without user namespace support, the UID/GID in the sandbox
+ # will match the host UID/GID.
+ return config.build_uid == self._uid and config.build_gid == self._gid
+
################################################
# Private Methods #
################################################
- def _check_user_ns_available(self, context):
+ def _check_user_ns_available(self):
# Here, lets check if bwrap is able to create user namespaces,
# issue a warning if it's not available, and save the state
@@ -75,9 +82,4 @@ class Linux(Platform):
return True
else:
- context.message(
- Message(None, MessageType.WARN,
- "Unable to create user namespaces with bubblewrap, resorting to fallback",
- detail="Some builds may not function due to lack of uid / gid 0, " +
- "artifacts created will not be trusted for push purposes."))
return False
diff --git a/buildstream/_platform/platform.py b/buildstream/_platform/platform.py
index 8a074eb62..b37964986 100644
--- a/buildstream/_platform/platform.py
+++ b/buildstream/_platform/platform.py
@@ -29,17 +29,13 @@ class Platform():
# Platform()
#
# A class to manage platform-specific details. Currently holds the
- # sandbox factory, the artifact cache and staging operations, as
- # well as platform helpers.
+ # sandbox factory as well as platform helpers.
#
- # Args:
- # context (context): The project context
- #
- def __init__(self, context):
- self.context = context
+ def __init__(self):
+ pass
@classmethod
- def create_instance(cls, *args, **kwargs):
+ def _create_instance(cls):
if sys.platform.startswith('linux'):
backend = 'linux'
else:
@@ -58,23 +54,15 @@ class Platform():
else:
raise PlatformError("No such platform: '{}'".format(backend))
- cls._instance = PlatformImpl(*args, **kwargs)
+ cls._instance = PlatformImpl()
@classmethod
def get_platform(cls):
if not cls._instance:
- raise PlatformError("Platform needs to be initialized first")
+ cls._create_instance()
return cls._instance
##################################################################
- # Platform properties #
- ##################################################################
- @property
- def artifactcache(self):
- raise ImplError("Platform {platform} does not implement an artifactcache"
- .format(platform=type(self).__name__))
-
- ##################################################################
# Sandbox functions #
##################################################################
@@ -92,3 +80,7 @@ class Platform():
def create_sandbox(self, *args, **kwargs):
raise ImplError("Platform {platform} does not implement create_sandbox()"
.format(platform=type(self).__name__))
+
+ def check_sandbox_config(self, config):
+ raise ImplError("Platform {platform} does not implement check_sandbox_config()"
+ .format(platform=type(self).__name__))
diff --git a/buildstream/_platform/unix.py b/buildstream/_platform/unix.py
index 0306a4ac5..7aa8cbc0d 100644
--- a/buildstream/_platform/unix.py
+++ b/buildstream/_platform/unix.py
@@ -19,7 +19,6 @@
import os
-from .._artifactcache.cascache import CASCache
from .._exceptions import PlatformError
from ..sandbox import SandboxChroot
@@ -28,18 +27,21 @@ from . import Platform
class Unix(Platform):
- def __init__(self, context):
+ def __init__(self):
- super().__init__(context)
- self._artifact_cache = CASCache(context)
+ super().__init__()
+
+ self._uid = os.geteuid()
+ self._gid = os.getegid()
# Not necessarily 100% reliable, but we want to fail early.
- if os.geteuid() != 0:
+ if self._uid != 0:
raise PlatformError("Root privileges are required to run without bubblewrap.")
- @property
- def artifactcache(self):
- return self._artifact_cache
-
def create_sandbox(self, *args, **kwargs):
return SandboxChroot(*args, **kwargs)
+
+ def check_sandbox_config(self, config):
+ # With the chroot sandbox, the UID/GID in the sandbox
+ # will match the host UID/GID (typically 0/0).
+ return config.build_uid == self._uid and config.build_gid == self._gid
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index fb56ca016..a96b92353 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -17,7 +17,6 @@
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job, JobStatus
-from ..._platform import Platform
class CacheSizeJob(Job):
@@ -25,8 +24,8 @@ class CacheSizeJob(Job):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
- platform = Platform.get_platform()
- self._artifacts = platform.artifactcache
+ context = self._scheduler.context
+ self._artifacts = context.artifactcache
def child_process(self):
return self._artifacts.compute_cache_size()
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index 97b45901f..a1d49f339 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -17,7 +17,6 @@
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job, JobStatus
-from ..._platform import Platform
class CleanupJob(Job):
@@ -25,11 +24,24 @@ class CleanupJob(Job):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
- platform = Platform.get_platform()
- self._artifacts = platform.artifactcache
+ context = self._scheduler.context
+ self._artifacts = context.artifactcache
def child_process(self):
- return self._artifacts.clean()
+ def progress():
+ self.send_message('update-cache-size',
+ self._artifacts.get_cache_size())
+ return self._artifacts.clean(progress)
+
+ def handle_message(self, message_type, message):
+
+ # Update the cache size in the main process as we go,
+ # this provides better feedback in the UI.
+ if message_type == 'update-cache-size':
+ self._artifacts.set_cache_size(message)
+ return True
+
+ return False
def parent_complete(self, status, result):
if status == JobStatus.OK:
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 348204750..b8b4a2c76 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -58,10 +58,10 @@ class JobStatus():
# Used to distinguish between status messages and return values
-class Envelope():
+class _Envelope():
def __init__(self, message_type, message):
- self._message_type = message_type
- self._message = message
+ self.message_type = message_type
+ self.message = message
# Process class that doesn't call waitpid on its own.
@@ -263,10 +263,37 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # send_message()
+ #
+ # To be called from inside Job.child_process() implementations
+ # to send messages to the main process during processing.
+ #
+ # These messages will be processed by the class's Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
#######################################################
# Abstract Methods #
#######################################################
+ # handle_message()
+ #
+ # Handle a custom message. This will be called in the main process in
+ # response to any messages sent to the main proces using the
+ # Job.send_message() API from inside a Job.child_process() implementation
+ #
+ # Args:
+ # message_type (str): A string to identify the message type
+ # message (any): A simple serializable object
+ #
+ # Returns:
+ # (bool): Should return a truthy value if message_type is handled.
+ #
+ def handle_message(self, message_type, message):
+ return False
+
# parent_complete()
#
# This will be executed after the job finishes, and is expected to
@@ -404,7 +431,7 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -430,7 +457,7 @@ class Job():
else:
# No exception occurred in the action
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
@@ -457,7 +484,7 @@ class Job():
domain = e.domain
reason = e.reason
- envelope = Envelope('error', {
+ envelope = _Envelope('error', {
'domain': domain,
'reason': reason
})
@@ -475,7 +502,7 @@ class Job():
#
def _child_send_result(self, result):
if result is not None:
- envelope = Envelope('result', result)
+ envelope = _Envelope('result', result)
self._queue.put(envelope)
# _child_shutdown()
@@ -512,7 +539,7 @@ class Job():
if message.message_type == MessageType.LOG:
return
- self._queue.put(Envelope('message', message))
+ self._queue.put(_Envelope('message', message))
# _parent_shutdown()
#
@@ -573,24 +600,28 @@ class Job():
if not self._listening:
return
- if envelope._message_type == 'message':
+ if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope._message)
- elif envelope._message_type == 'error':
+ self._scheduler.context.message(envelope.message)
+ elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope._message['domain'],
- envelope._message['reason'])
- elif envelope._message_type == 'result':
+ set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
+ elif envelope.message_type == 'result':
assert self._result is None
- self._result = envelope._message
- elif envelope._message_type == 'child_data':
+ self._result = envelope.message
+ elif envelope.message_type == 'child_data':
# If we retry a job, we assign a new value to this
- self.child_data = envelope._message
- else:
- raise Exception()
+ self.child_data = envelope.message
+
+ # Try Job subclass specific messages now
+ elif not self.handle_message(envelope.message_type,
+ envelope.message):
+ assert 0, "Unhandled message type '{}': {}" \
+ .format(envelope.message_type, envelope.message)
# _parent_process_queue()
#
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index df8364552..49ded7e2c 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -21,7 +21,6 @@
from . import Queue, QueueStatus
from ..jobs import JobStatus
from ..resources import ResourceType
-from ..._platform import Platform
# A queue which assembles elements
@@ -55,8 +54,8 @@ class BuildQueue(Queue):
# as returned from Element._assemble() to the estimated
# artifact cache size
#
- platform = Platform.get_platform()
- artifacts = platform.artifactcache
+ context = self._scheduler.context
+ artifacts = context.artifactcache
artifacts.add_artifact_size(artifact_size)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 30e70a4fc..68c115c1b 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -29,7 +29,6 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
from .jobs import JobStatus, CacheSizeJob, CleanupJob
-from .._platform import Platform
# A decent return code for Scheduler.run()
@@ -286,6 +285,8 @@ class Scheduler():
# Callback for the cache size job
def _cache_size_job_complete(self, status, cache_size):
+ context = self.context
+ artifacts = context.artifactcache
# Deallocate cache size job resources
self._cache_size_running = None
@@ -295,9 +296,6 @@ class Scheduler():
if status != JobStatus.OK:
return
- platform = Platform.get_platform()
- artifacts = platform.artifactcache
-
if artifacts.has_quota_exceeded():
self._cleanup_scheduled = True
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index de3ae464c..310bfbde0 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -32,7 +32,6 @@ from ._exceptions import StreamError, ImplError, BstError
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
from ._pipeline import Pipeline, PipelineSelection
-from ._platform import Platform
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -71,8 +70,7 @@ class Stream():
#
# Private members
#
- self._platform = Platform.get_platform()
- self._artifacts = self._platform.artifactcache
+ self._artifacts = context.artifactcache
self._context = context
self._project = project
self._pipeline = Pipeline(context, project, self._artifacts)
diff --git a/buildstream/element.py b/buildstream/element.py
index 7be27cb2a..898ff0784 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -269,6 +269,16 @@ class Element(Plugin):
# Extract Sandbox config
self.__sandbox_config = self.__extract_sandbox_config(meta)
+ # Extract Sandbox config
+ self.__sandbox_config = self.__extract_sandbox_config(meta)
+
+ self.__sandbox_config_supported = True
+ platform = Platform.get_platform()
+ if not platform.check_sandbox_config(self.__sandbox_config):
+ # Local sandbox does not fully support specified sandbox config.
+ # This will taint the artifact, disable pushing.
+ self.__sandbox_config_supported = False
+
def __lt__(self, other):
return self.name < other.name
@@ -1483,6 +1493,11 @@ class Element(Plugin):
context = self._get_context()
with self._output_file() as output_file:
+ if not self.__sandbox_config_supported:
+ self.warn("Sandbox configuration is not supported by the platform.",
+ detail="Falling back to UID {} GID {}. Artifact will not be pushed."
+ .format(self.__sandbox_config.build_uid, self.__sandbox_config.build_gid))
+
# Explicitly clean it up, keep the build dir around if exceptions are raised
os.makedirs(context.builddir, exist_ok=True)
rootdir = tempfile.mkdtemp(prefix="{}-".format(self.normal_name), dir=context.builddir)
@@ -2042,7 +2057,8 @@ class Element(Plugin):
workspaced_dependencies = self.__get_artifact_metadata_workspaced_dependencies()
# Other conditions should be or-ed
- self.__tainted = workspaced or workspaced_dependencies
+ self.__tainted = (workspaced or workspaced_dependencies or
+ not self.__sandbox_config_supported)
return self.__tainted
diff --git a/buildstream/utils.py b/buildstream/utils.py
index e3e9dc8c0..d02777897 100644
--- a/buildstream/utils.py
+++ b/buildstream/utils.py
@@ -583,6 +583,27 @@ def _get_dir_size(path):
return get_size(path)
+# _get_volume_size():
+#
+# Gets the overall usage and total size of a mounted filesystem in bytes.
+#
+# Args:
+# path (str): The path to check
+#
+# Returns:
+# (int): The total number of bytes on the volume
+# (int): The number of available bytes on the volume
+#
+def _get_volume_size(path):
+ try:
+ stat_ = os.statvfs(path)
+ except OSError as e:
+ raise UtilError("Failed to retrieve stats on volume for path '{}': {}"
+ .format(path, e)) from e
+
+ return stat_.f_bsize * stat_.f_blocks, stat_.f_bsize * stat_.f_bavail
+
+
# _parse_size():
#
# Convert a string representing data size to a number of
@@ -617,8 +638,7 @@ def _parse_size(size, volume):
if num > 100:
raise UtilError("{}% is not a valid percentage value.".format(num))
- stat_ = os.statvfs(volume)
- disk_size = stat_.f_blocks * stat_.f_bsize
+ disk_size, _ = _get_volume_size(volume)
return disk_size * (num / 100)
diff --git a/tests/artifactcache/cache_size.py b/tests/artifactcache/cache_size.py
index 0d12cda8c..63ab9ad07 100644
--- a/tests/artifactcache/cache_size.py
+++ b/tests/artifactcache/cache_size.py
@@ -1,8 +1,10 @@
import os
import pytest
+from unittest import mock
from buildstream import _yaml
from buildstream._artifactcache import CACHE_SIZE_FILE
+from buildstream._exceptions import ErrorDomain
from tests.testutils import cli, create_element_size
@@ -60,3 +62,29 @@ def test_cache_size_write(cli, tmpdir):
with open(sizefile, "r") as f:
size_data = f.read()
size = int(size_data)
+
+
+def test_quota_over_1024T(cli, tmpdir):
+ KiB = 1024
+ MiB = (KiB * 1024)
+ GiB = (MiB * 1024)
+ TiB = (GiB * 1024)
+
+ cli.configure({
+ 'cache': {
+ 'quota': 2048 * TiB
+ }
+ })
+ project = tmpdir.join("main")
+ os.makedirs(str(project))
+ _yaml.dump({'name': 'main'}, str(project.join("project.conf")))
+
+ volume_space_patch = mock.patch(
+ "buildstream._artifactcache.ArtifactCache._get_cache_volume_size",
+ autospec=True,
+ return_value=(1025 * TiB, 1025 * TiB)
+ )
+
+ with volume_space_patch:
+ result = cli.run(project, args=["build", "file.bst"])
+ result.assert_main_error(ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota')
diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py
index f8b928cbf..ce8e6c9e8 100644
--- a/tests/artifactcache/expiry.py
+++ b/tests/artifactcache/expiry.py
@@ -1,6 +1,7 @@
import os
import pytest
+from unittest import mock
from buildstream import _yaml
from buildstream._exceptions import ErrorDomain, LoadErrorReason
@@ -282,18 +283,28 @@ def test_never_delete_required_track(cli, datafiles, tmpdir):
# Ensure that only valid cache quotas make it through the loading
# process.
-@pytest.mark.parametrize("quota,success", [
- ("1", True),
- ("1K", True),
- ("50%", True),
- ("infinity", True),
- ("0", True),
- ("-1", False),
- ("pony", False),
- ("200%", False)
+#
+# This test virtualizes the condition to assume a storage volume
+# has 10K total disk space, and 6K of it is already in use (not
+# including any space used by the artifact cache).
+#
+@pytest.mark.parametrize("quota,err_domain,err_reason", [
+ # Valid configurations
+ ("1", 'success', None),
+ ("1K", 'success', None),
+ ("50%", 'success', None),
+ ("infinity", 'success', None),
+ ("0", 'success', None),
+ # Invalid configurations
+ ("-1", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA),
+ ("pony", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA),
+ ("200%", ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA),
+ # Not enough space for these caches
+ ("7K", ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota'),
+ ("70%", ErrorDomain.ARTIFACT, 'insufficient-storage-for-quota')
])
@pytest.mark.datafiles(DATA_DIR)
-def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
+def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, err_domain, err_reason):
project = os.path.join(datafiles.dirname, datafiles.basename)
os.makedirs(os.path.join(project, 'elements'))
@@ -303,11 +314,39 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
}
})
- res = cli.run(project=project, args=['workspace', 'list'])
- if success:
+ # We patch how we get space information
+ # Ideally we would instead create a FUSE device on which we control
+ # everything.
+ # If the value is a percentage, we fix the current values to take into
+ # account the block size, since this is important in how we compute the size
+
+ if quota.endswith("%"): # We set the used space at 60% of total space
+ stats = os.statvfs(".")
+ free_space = 0.6 * stats.f_bsize * stats.f_blocks
+ total_space = stats.f_bsize * stats.f_blocks
+ else:
+ free_space = 6000
+ total_space = 10000
+
+ volume_space_patch = mock.patch(
+ "buildstream._artifactcache.ArtifactCache._get_cache_volume_size",
+ autospec=True,
+ return_value=(total_space, free_space),
+ )
+
+ cache_size_patch = mock.patch(
+ "buildstream._artifactcache.ArtifactCache.get_cache_size",
+ autospec=True,
+ return_value=0,
+ )
+
+ with volume_space_patch, cache_size_patch:
+ res = cli.run(project=project, args=['workspace', 'list'])
+
+ if err_domain == 'success':
res.assert_success()
else:
- res.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA)
+ res.assert_main_error(err_domain, err_reason)
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 1e417c592..c7987e02c 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -2,6 +2,7 @@ import string
import pytest
import subprocess
import os
+import sys
import shutil
import signal
from collections import namedtuple
@@ -10,7 +11,6 @@ from contextlib import contextmanager
from multiprocessing import Process, Queue
from buildstream import _yaml
-from buildstream._artifactcache.cascache import CASCache
from buildstream._artifactcache.casserver import create_server
from buildstream._context import Context
from buildstream._exceptions import ArtifactError
@@ -50,8 +50,9 @@ class ArtifactShare():
context = Context()
context.artifactdir = self.repodir
+ context.set_message_handler(self._message_handler)
- self.cas = CASCache(context)
+ self.cas = context.artifactcache
self.total_space = total_space
self.free_space = free_space
@@ -167,6 +168,13 @@ class ArtifactShare():
f_bavail=self.free_space - repo_size,
f_bsize=1)
+ def _message_handler(self, message, context):
+ # We need a message handler because this will own an ArtifactCache
+ # which can in turn fire messages.
+
+ # Just unconditionally print the messages to stderr
+ print(message.message, file=sys.stderr)
+
# create_artifact_share()
#