summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-24 19:44:52 +0000
committerTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-24 19:44:52 +0000
commita2140d748ee4552034a0424b39c79ce6192695a8 (patch)
treef0f7aee69fccea3c9fef486a35da511283560c2a
parentacd0bf224cb447b3c650da22d4e3b55964a87521 (diff)
parent9c33107f48948523737ab3419a08d0fbe5b9474e (diff)
downloadbuildstream-a2140d748ee4552034a0424b39c79ce6192695a8.tar.gz
Merge branch 'tristan/cache-management-logging' into 'master'
Cache management logging enhancements See merge request BuildStream/buildstream!1105
-rw-r--r--buildstream/_artifactcache.py142
-rw-r--r--buildstream/_context.py12
-rw-r--r--buildstream/_frontend/app.py12
-rw-r--r--buildstream/_frontend/status.py42
-rw-r--r--buildstream/_frontend/widget.py1
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py15
-rw-r--r--buildstream/_scheduler/jobs/job.py69
-rw-r--r--buildstream/utils.py24
-rw-r--r--tests/artifactcache/cache_size.py2
-rw-r--r--tests/artifactcache/expiry.py4
10 files changed, 260 insertions, 63 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py
index b1afdf377..aa40f571a 100644
--- a/buildstream/_artifactcache.py
+++ b/buildstream/_artifactcache.py
@@ -46,6 +46,39 @@ class ArtifactCacheSpec(CASRemoteSpec):
pass
+# ArtifactCacheUsage
+#
+# A simple object to report the current artifact cache
+# usage details.
+#
+# Note that this uses the user configured cache quota
+# rather than the internal quota with protective headroom
+# removed, to provide a more sensible value to display to
+# the user.
+#
+# Args:
+# artifacts (ArtifactCache): The artifact cache to get the status of
+#
+class ArtifactCacheUsage():
+
+ def __init__(self, artifacts):
+ context = artifacts.context
+ self.quota_config = context.config_cache_quota # Configured quota
+ self.quota_size = artifacts._cache_quota_original # Resolved cache quota in bytes
+ self.used_size = artifacts.get_cache_size() # Size used by artifacts in bytes
+ self.used_percent = 0 # Percentage of the quota used
+ if self.quota_size is not None:
+ self.used_percent = int(self.used_size * 100 / self.quota_size)
+
+ # Formattable into a human readable string
+ #
+ def __str__(self):
+ return "{} / {} ({}%)" \
+ .format(utils._pretty_size(self.used_size, dec_places=1),
+ self.quota_config,
+ self.used_percent)
+
+
# An ArtifactCache manages artifacts.
#
# Args:
@@ -64,6 +97,7 @@ class ArtifactCache():
self._required_elements = set() # The elements required for this session
self._cache_size = None # The current cache size, sometimes it's an estimate
self._cache_quota = None # The cache quota
+ self._cache_quota_original = None # The cache quota as specified by the user, in bytes
self._cache_lower_threshold = None # The target cache size for a cleanup
self._remotes_setup = False # Check to prevent double-setup of remotes
@@ -216,11 +250,33 @@ class ArtifactCache():
#
# Clean the artifact cache as much as possible.
#
+ # Args:
+ # progress (callable): A callback to call when a ref is removed
+ #
# Returns:
# (int): The size of the cache after having cleaned up
#
- def clean(self):
+ def clean(self, progress=None):
artifacts = self.list_artifacts()
+ context = self.context
+
+ # Some accumulative statistics
+ removed_ref_count = 0
+ space_saved = 0
+
+ # Start off with an announcement with as much info as possible
+ volume_size, volume_avail = self._get_cache_volume_size()
+ self._message(MessageType.STATUS, "Starting cache cleanup",
+ detail=("Elements required by the current build plan: {}\n" +
+ "User specified quota: {} ({})\n" +
+ "Cache usage: {}\n" +
+ "Cache volume: {} total, {} available")
+ .format(len(self._required_elements),
+ context.config_cache_quota,
+ utils._pretty_size(self._cache_quota_original, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ utils._pretty_size(volume_size, dec_places=2),
+ utils._pretty_size(volume_avail, dec_places=2)))
# Build a set of the cache keys which are required
# based on the required elements at cleanup time
@@ -245,11 +301,18 @@ class ArtifactCache():
# can't remove them, we have to abort the build.
#
# FIXME: Asking the user what to do may be neater
+ #
default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
'buildstream.conf')
- detail = ("There is not enough space to complete the build.\n"
- "Please increase the cache-quota in {}."
- .format(self.context.config_origin or default_conf))
+ detail = ("Aborted after removing {} refs and saving {} disk space.\n"
+ "The remaining {} in the cache is required by the {} elements in your build plan\n\n"
+ "There is not enough space to complete the build.\n"
+ "Please increase the cache-quota in {} and/or make more disk space."
+ .format(removed_ref_count,
+ utils._pretty_size(space_saved, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ len(self._required_elements),
+ (context.config_origin or default_conf)))
if self.has_quota_exceeded():
raise ArtifactError("Cache too full. Aborting.",
@@ -264,10 +327,33 @@ class ArtifactCache():
# Remove the actual artifact, if it's not required.
size = self.remove(to_remove)
+ removed_ref_count += 1
+ space_saved += size
+
+ self._message(MessageType.STATUS,
+ "Freed {: <7} {}".format(
+ utils._pretty_size(size, dec_places=2),
+ to_remove))
+
# Remove the size from the removed size
self.set_cache_size(self._cache_size - size)
- # This should be O(1) if implemented correctly
+ # User callback
+ #
+ # Currently this process is fairly slow, but we should
+ # think about throttling this progress() callback if this
+ # becomes too intense.
+ if progress:
+ progress()
+
+ # Informational message about the side effects of the cleanup
+ self._message(MessageType.INFO, "Cleanup completed",
+ detail=("Removed {} refs and saving {} disk space.\n" +
+ "Cache usage is now: {}")
+ .format(removed_ref_count,
+ utils._pretty_size(space_saved, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2)))
+
return self.get_cache_size()
# compute_cache_size()
@@ -279,7 +365,14 @@ class ArtifactCache():
# (int): The size of the artifact cache.
#
def compute_cache_size(self):
- self._cache_size = self.cas.calculate_cache_size()
+ old_cache_size = self._cache_size
+ new_cache_size = self.cas.calculate_cache_size()
+
+ if old_cache_size != new_cache_size:
+ self._cache_size = new_cache_size
+
+ usage = ArtifactCacheUsage(self)
+ self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage))
return self._cache_size
@@ -307,7 +400,7 @@ class ArtifactCache():
# it is greater than the actual cache size.
#
# Returns:
- # (int) An approximation of the artifact cache size.
+ # (int) An approximation of the artifact cache size, in bytes.
#
def get_cache_size(self):
@@ -459,8 +552,7 @@ class ArtifactCache():
# `ArtifactCache.get_artifact_fullname`)
#
# Returns:
- # (int|None) The amount of space pruned from the repository in
- # Bytes, or None if defer_prune is True
+ # (int): The amount of space recovered in the cache, in bytes
#
def remove(self, ref):
@@ -848,19 +940,16 @@ class ArtifactCache():
else:
headroom = 2e9
- artifactdir_volume = self.context.artifactdir
- while not os.path.exists(artifactdir_volume):
- artifactdir_volume = os.path.dirname(artifactdir_volume)
-
try:
- cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume)
+ cache_quota = utils._parse_size(self.context.config_cache_quota,
+ self.context.artifactdir)
except utils.UtilError as e:
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}\nPlease specify the value in bytes or as a % of full disk space.\n"
"\nValid values are, for example: 800M 10G 1T 50%\n"
.format(str(e))) from e
- available_space, total_size = self._get_volume_space_info_for(artifactdir_volume)
+ total_size, available_space = self._get_cache_volume_size()
cache_size = self.get_cache_size()
# Ensure system has enough storage for the cache_quota
@@ -900,22 +989,25 @@ class ArtifactCache():
# if we end up writing more than 2G, but hey, this stuff is
# already really fuzzy.
#
+ self._cache_quota_original = cache_quota
self._cache_quota = cache_quota - headroom
self._cache_lower_threshold = self._cache_quota / 2
- # _get_volume_space_info_for
- #
- # Get the available space and total space for the given volume
+ # _get_cache_volume_size()
#
- # Args:
- # volume: volume for which to get the size
+ # Get the available space and total space for the volume on
+ # which the artifact cache is located.
#
# Returns:
- # A tuple containing first the availabe number of bytes on the requested
- # volume, then the total number of bytes of the volume.
- def _get_volume_space_info_for(self, volume):
- stat = os.statvfs(volume)
- return stat.f_bsize * stat.f_bavail, stat.f_bsize * stat.f_blocks
+ # (int): The total number of bytes on the volume
+ # (int): The number of available bytes on the volume
+ #
+ # NOTE: We use this stub to allow the test cases
+ # to override what an artifact cache thinks
+ # about it's disk size and available bytes.
+ #
+ def _get_cache_volume_size(self):
+ return utils._get_volume_size(self.context.artifactdir)
# _configured_remote_artifact_cache_specs():
diff --git a/buildstream/_context.py b/buildstream/_context.py
index f14f6b746..29a016065 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -30,7 +30,7 @@ from . import _yaml
from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
-from ._artifactcache import ArtifactCache
+from ._artifactcache import ArtifactCache, ArtifactCacheUsage
from ._cas import CASCache
from ._workspaces import Workspaces, WorkspaceProjectCache
from .plugin import _plugin_lookup
@@ -289,6 +289,16 @@ class Context():
return self._artifactcache
+ # get_artifact_cache_usage()
+ #
+ # Fetches the current usage of the artifact cache
+ #
+ # Returns:
+ # (ArtifactCacheUsage): The current status
+ #
+ def get_artifact_cache_usage(self):
+ return ArtifactCacheUsage(self.artifactcache)
+
# add_project():
#
# Add a project to the context.
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index af38ae901..b6da079bd 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -194,11 +194,6 @@ class App():
except BstError as e:
self._error_exit(e, "Error instantiating platform")
- try:
- self.context.artifactcache.preflight()
- except BstError as e:
- self._error_exit(e, "Error instantiating artifact cache")
-
# Create the logger right before setting the message handler
self.logger = LogLine(self.context,
self._content_profile,
@@ -211,6 +206,13 @@ class App():
# Propagate pipeline feedback to the user
self.context.set_message_handler(self._message_handler)
+ # Preflight the artifact cache after initializing logging,
+ # this can cause messages to be emitted.
+ try:
+ self.context.artifactcache.preflight()
+ except BstError as e:
+ self._error_exit(e, "Error instantiating artifact cache")
+
#
# Load the Project
#
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 2e7213e78..70f233357 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -353,13 +353,17 @@ class _StatusHeader():
def render(self, line_length, elapsed):
project = self._context.get_toplevel_project()
line_length = max(line_length, 80)
- size = 0
- text = ''
+ #
+ # Line 1: Session time, project name, session / total elements
+ #
+ # ========= 00:00:00 project-name (143/387) =========
+ #
session = str(len(self._stream.session_elements))
total = str(len(self._stream.total_elements))
- # Format and calculate size for target and overall time code
+ size = 0
+ text = ''
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
size += 8 # Size of time code
size += len(project.name) + 1
@@ -372,6 +376,12 @@ class _StatusHeader():
self._format_profile.fmt(')')
line1 = self._centered(text, size, line_length, '=')
+
+ #
+ # Line 2: Dynamic list of queue status reports
+ #
+ # (Fetched:0 117 0)→ (Built:4 0 0)
+ #
size = 0
text = ''
@@ -389,10 +399,28 @@ class _StatusHeader():
line2 = self._centered(text, size, line_length, ' ')
- size = 24
- text = self._format_profile.fmt("~~~~~ ") + \
- self._content_profile.fmt('Active Tasks') + \
- self._format_profile.fmt(" ~~~~~")
+ #
+ # Line 3: Cache usage percentage report
+ #
+ # ~~~~~~ cache: 69% ~~~~~~
+ #
+ usage = self._context.get_artifact_cache_usage()
+ usage_percent = '{}%'.format(usage.used_percent)
+
+ size = 21
+ size += len(usage_percent)
+ if usage.used_percent >= 95:
+ formatted_usage_percent = self._error_profile.fmt(usage_percent)
+ elif usage.used_percent >= 80:
+ formatted_usage_percent = self._content_profile.fmt(usage_percent)
+ else:
+ formatted_usage_percent = self._success_profile.fmt(usage_percent)
+
+ text = self._format_profile.fmt("~~~~~~ ") + \
+ self._content_profile.fmt('cache') + \
+ self._format_profile.fmt(': ') + \
+ formatted_usage_percent + \
+ self._format_profile.fmt(' ~~~~~~')
line3 = self._centered(text, size, line_length, ' ')
return line1 + '\n' + line2 + '\n' + line3
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index c43856145..c5353bc43 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -452,6 +452,7 @@ class LogLine(Widget):
values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S')
values["Project"] = "{} ({})".format(project.name, project.directory)
values["Targets"] = ", ".join([t.name for t in stream.targets])
+ values["Cache Usage"] = "{}".format(context.get_artifact_cache_usage())
text += self._format_values(values)
# User configurations
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index b378b3dab..a1d49f339 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -28,7 +28,20 @@ class CleanupJob(Job):
self._artifacts = context.artifactcache
def child_process(self):
- return self._artifacts.clean()
+ def progress():
+ self.send_message('update-cache-size',
+ self._artifacts.get_cache_size())
+ return self._artifacts.clean(progress)
+
+ def handle_message(self, message_type, message):
+
+ # Update the cache size in the main process as we go,
+ # this provides better feedback in the UI.
+ if message_type == 'update-cache-size':
+ self._artifacts.set_cache_size(message)
+ return True
+
+ return False
def parent_complete(self, status, result):
if status == JobStatus.OK:
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 7f14cf05f..e73150e7b 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -58,10 +58,10 @@ class JobStatus():
# Used to distinguish between status messages and return values
-class Envelope():
+class _Envelope():
def __init__(self, message_type, message):
- self._message_type = message_type
- self._message = message
+ self.message_type = message_type
+ self.message = message
# Process class that doesn't call waitpid on its own.
@@ -275,10 +275,37 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # send_message()
+ #
+ # To be called from inside Job.child_process() implementations
+ # to send messages to the main process during processing.
+ #
+ # These messages will be processed by the class's Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
#######################################################
# Abstract Methods #
#######################################################
+ # handle_message()
+ #
+ # Handle a custom message. This will be called in the main process in
+ # response to any messages sent to the main proces using the
+ # Job.send_message() API from inside a Job.child_process() implementation
+ #
+ # Args:
+ # message_type (str): A string to identify the message type
+ # message (any): A simple serializable object
+ #
+ # Returns:
+ # (bool): Should return a truthy value if message_type is handled.
+ #
+ def handle_message(self, message_type, message):
+ return False
+
# parent_complete()
#
# This will be executed after the job finishes, and is expected to
@@ -416,7 +443,7 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -442,7 +469,7 @@ class Job():
else:
# No exception occurred in the action
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
@@ -469,7 +496,7 @@ class Job():
domain = e.domain
reason = e.reason
- envelope = Envelope('error', {
+ envelope = _Envelope('error', {
'domain': domain,
'reason': reason
})
@@ -487,7 +514,7 @@ class Job():
#
def _child_send_result(self, result):
if result is not None:
- envelope = Envelope('result', result)
+ envelope = _Envelope('result', result)
self._queue.put(envelope)
# _child_shutdown()
@@ -524,7 +551,7 @@ class Job():
if message.message_type == MessageType.LOG:
return
- self._queue.put(Envelope('message', message))
+ self._queue.put(_Envelope('message', message))
# _parent_shutdown()
#
@@ -588,24 +615,28 @@ class Job():
if not self._listening:
return
- if envelope._message_type == 'message':
+ if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope._message)
- elif envelope._message_type == 'error':
+ self._scheduler.context.message(envelope.message)
+ elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope._message['domain'],
- envelope._message['reason'])
- elif envelope._message_type == 'result':
+ set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
+ elif envelope.message_type == 'result':
assert self._result is None
- self._result = envelope._message
- elif envelope._message_type == 'child_data':
+ self._result = envelope.message
+ elif envelope.message_type == 'child_data':
# If we retry a job, we assign a new value to this
- self.child_data = envelope._message
- else:
- raise Exception()
+ self.child_data = envelope.message
+
+ # Try Job subclass specific messages now
+ elif not self.handle_message(envelope.message_type,
+ envelope.message):
+ assert 0, "Unhandled message type '{}': {}" \
+ .format(envelope.message_type, envelope.message)
# _parent_process_queue()
#
diff --git a/buildstream/utils.py b/buildstream/utils.py
index e3ff88034..c8d79c95a 100644
--- a/buildstream/utils.py
+++ b/buildstream/utils.py
@@ -633,6 +633,27 @@ def _get_dir_size(path):
return get_size(path)
+# _get_volume_size():
+#
+# Gets the overall usage and total size of a mounted filesystem in bytes.
+#
+# Args:
+# path (str): The path to check
+#
+# Returns:
+# (int): The total number of bytes on the volume
+# (int): The number of available bytes on the volume
+#
+def _get_volume_size(path):
+ try:
+ stat_ = os.statvfs(path)
+ except OSError as e:
+ raise UtilError("Failed to retrieve stats on volume for path '{}': {}"
+ .format(path, e)) from e
+
+ return stat_.f_bsize * stat_.f_blocks, stat_.f_bsize * stat_.f_bavail
+
+
# _parse_size():
#
# Convert a string representing data size to a number of
@@ -667,8 +688,7 @@ def _parse_size(size, volume):
if num > 100:
raise UtilError("{}% is not a valid percentage value.".format(num))
- stat_ = os.statvfs(volume)
- disk_size = stat_.f_blocks * stat_.f_bsize
+ disk_size, _ = _get_volume_size(volume)
return disk_size * (num / 100)
diff --git a/tests/artifactcache/cache_size.py b/tests/artifactcache/cache_size.py
index 11c8f6194..63ab9ad07 100644
--- a/tests/artifactcache/cache_size.py
+++ b/tests/artifactcache/cache_size.py
@@ -80,7 +80,7 @@ def test_quota_over_1024T(cli, tmpdir):
_yaml.dump({'name': 'main'}, str(project.join("project.conf")))
volume_space_patch = mock.patch(
- "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
+ "buildstream._artifactcache.ArtifactCache._get_cache_volume_size",
autospec=True,
return_value=(1025 * TiB, 1025 * TiB)
)
diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py
index d7bafe7e8..2230b70bd 100644
--- a/tests/artifactcache/expiry.py
+++ b/tests/artifactcache/expiry.py
@@ -358,9 +358,9 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, err_domain, err_reas
total_space = 10000
volume_space_patch = mock.patch(
- "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for",
+ "buildstream._artifactcache.ArtifactCache._get_cache_volume_size",
autospec=True,
- return_value=(free_space, total_space),
+ return_value=(total_space, free_space),
)
cache_size_patch = mock.patch(