diff options
author | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2019-01-24 19:44:52 +0000 |
---|---|---|
committer | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2019-01-24 19:44:52 +0000 |
commit | a2140d748ee4552034a0424b39c79ce6192695a8 (patch) | |
tree | f0f7aee69fccea3c9fef486a35da511283560c2a | |
parent | acd0bf224cb447b3c650da22d4e3b55964a87521 (diff) | |
parent | 9c33107f48948523737ab3419a08d0fbe5b9474e (diff) | |
download | buildstream-a2140d748ee4552034a0424b39c79ce6192695a8.tar.gz |
Merge branch 'tristan/cache-management-logging' into 'master'
Cache management logging enhancements
See merge request BuildStream/buildstream!1105
-rw-r--r-- | buildstream/_artifactcache.py | 142 | ||||
-rw-r--r-- | buildstream/_context.py | 12 | ||||
-rw-r--r-- | buildstream/_frontend/app.py | 12 | ||||
-rw-r--r-- | buildstream/_frontend/status.py | 42 | ||||
-rw-r--r-- | buildstream/_frontend/widget.py | 1 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cleanupjob.py | 15 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 69 | ||||
-rw-r--r-- | buildstream/utils.py | 24 | ||||
-rw-r--r-- | tests/artifactcache/cache_size.py | 2 | ||||
-rw-r--r-- | tests/artifactcache/expiry.py | 4 |
10 files changed, 260 insertions, 63 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index b1afdf377..aa40f571a 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -46,6 +46,39 @@ class ArtifactCacheSpec(CASRemoteSpec): pass +# ArtifactCacheUsage +# +# A simple object to report the current artifact cache +# usage details. +# +# Note that this uses the user configured cache quota +# rather than the internal quota with protective headroom +# removed, to provide a more sensible value to display to +# the user. +# +# Args: +# artifacts (ArtifactCache): The artifact cache to get the status of +# +class ArtifactCacheUsage(): + + def __init__(self, artifacts): + context = artifacts.context + self.quota_config = context.config_cache_quota # Configured quota + self.quota_size = artifacts._cache_quota_original # Resolved cache quota in bytes + self.used_size = artifacts.get_cache_size() # Size used by artifacts in bytes + self.used_percent = 0 # Percentage of the quota used + if self.quota_size is not None: + self.used_percent = int(self.used_size * 100 / self.quota_size) + + # Formattable into a human readable string + # + def __str__(self): + return "{} / {} ({}%)" \ + .format(utils._pretty_size(self.used_size, dec_places=1), + self.quota_config, + self.used_percent) + + # An ArtifactCache manages artifacts. # # Args: @@ -64,6 +97,7 @@ class ArtifactCache(): self._required_elements = set() # The elements required for this session self._cache_size = None # The current cache size, sometimes it's an estimate self._cache_quota = None # The cache quota + self._cache_quota_original = None # The cache quota as specified by the user, in bytes self._cache_lower_threshold = None # The target cache size for a cleanup self._remotes_setup = False # Check to prevent double-setup of remotes @@ -216,11 +250,33 @@ class ArtifactCache(): # # Clean the artifact cache as much as possible. # + # Args: + # progress (callable): A callback to call when a ref is removed + # # Returns: # (int): The size of the cache after having cleaned up # - def clean(self): + def clean(self, progress=None): artifacts = self.list_artifacts() + context = self.context + + # Some accumulative statistics + removed_ref_count = 0 + space_saved = 0 + + # Start off with an announcement with as much info as possible + volume_size, volume_avail = self._get_cache_volume_size() + self._message(MessageType.STATUS, "Starting cache cleanup", + detail=("Elements required by the current build plan: {}\n" + + "User specified quota: {} ({})\n" + + "Cache usage: {}\n" + + "Cache volume: {} total, {} available") + .format(len(self._required_elements), + context.config_cache_quota, + utils._pretty_size(self._cache_quota_original, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + utils._pretty_size(volume_size, dec_places=2), + utils._pretty_size(volume_avail, dec_places=2))) # Build a set of the cache keys which are required # based on the required elements at cleanup time @@ -245,11 +301,18 @@ class ArtifactCache(): # can't remove them, we have to abort the build. # # FIXME: Asking the user what to do may be neater + # default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], 'buildstream.conf') - detail = ("There is not enough space to complete the build.\n" - "Please increase the cache-quota in {}." - .format(self.context.config_origin or default_conf)) + detail = ("Aborted after removing {} refs and saving {} disk space.\n" + "The remaining {} in the cache is required by the {} elements in your build plan\n\n" + "There is not enough space to complete the build.\n" + "Please increase the cache-quota in {} and/or make more disk space." + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + len(self._required_elements), + (context.config_origin or default_conf))) if self.has_quota_exceeded(): raise ArtifactError("Cache too full. Aborting.", @@ -264,10 +327,33 @@ class ArtifactCache(): # Remove the actual artifact, if it's not required. size = self.remove(to_remove) + removed_ref_count += 1 + space_saved += size + + self._message(MessageType.STATUS, + "Freed {: <7} {}".format( + utils._pretty_size(size, dec_places=2), + to_remove)) + # Remove the size from the removed size self.set_cache_size(self._cache_size - size) - # This should be O(1) if implemented correctly + # User callback + # + # Currently this process is fairly slow, but we should + # think about throttling this progress() callback if this + # becomes too intense. + if progress: + progress() + + # Informational message about the side effects of the cleanup + self._message(MessageType.INFO, "Cleanup completed", + detail=("Removed {} refs and saving {} disk space.\n" + + "Cache usage is now: {}") + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2))) + return self.get_cache_size() # compute_cache_size() @@ -279,7 +365,14 @@ class ArtifactCache(): # (int): The size of the artifact cache. # def compute_cache_size(self): - self._cache_size = self.cas.calculate_cache_size() + old_cache_size = self._cache_size + new_cache_size = self.cas.calculate_cache_size() + + if old_cache_size != new_cache_size: + self._cache_size = new_cache_size + + usage = ArtifactCacheUsage(self) + self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage)) return self._cache_size @@ -307,7 +400,7 @@ class ArtifactCache(): # it is greater than the actual cache size. # # Returns: - # (int) An approximation of the artifact cache size. + # (int) An approximation of the artifact cache size, in bytes. # def get_cache_size(self): @@ -459,8 +552,7 @@ class ArtifactCache(): # `ArtifactCache.get_artifact_fullname`) # # Returns: - # (int|None) The amount of space pruned from the repository in - # Bytes, or None if defer_prune is True + # (int): The amount of space recovered in the cache, in bytes # def remove(self, ref): @@ -848,19 +940,16 @@ class ArtifactCache(): else: headroom = 2e9 - artifactdir_volume = self.context.artifactdir - while not os.path.exists(artifactdir_volume): - artifactdir_volume = os.path.dirname(artifactdir_volume) - try: - cache_quota = utils._parse_size(self.context.config_cache_quota, artifactdir_volume) + cache_quota = utils._parse_size(self.context.config_cache_quota, + self.context.artifactdir) except utils.UtilError as e: raise LoadError(LoadErrorReason.INVALID_DATA, "{}\nPlease specify the value in bytes or as a % of full disk space.\n" "\nValid values are, for example: 800M 10G 1T 50%\n" .format(str(e))) from e - available_space, total_size = self._get_volume_space_info_for(artifactdir_volume) + total_size, available_space = self._get_cache_volume_size() cache_size = self.get_cache_size() # Ensure system has enough storage for the cache_quota @@ -900,22 +989,25 @@ class ArtifactCache(): # if we end up writing more than 2G, but hey, this stuff is # already really fuzzy. # + self._cache_quota_original = cache_quota self._cache_quota = cache_quota - headroom self._cache_lower_threshold = self._cache_quota / 2 - # _get_volume_space_info_for - # - # Get the available space and total space for the given volume + # _get_cache_volume_size() # - # Args: - # volume: volume for which to get the size + # Get the available space and total space for the volume on + # which the artifact cache is located. # # Returns: - # A tuple containing first the availabe number of bytes on the requested - # volume, then the total number of bytes of the volume. - def _get_volume_space_info_for(self, volume): - stat = os.statvfs(volume) - return stat.f_bsize * stat.f_bavail, stat.f_bsize * stat.f_blocks + # (int): The total number of bytes on the volume + # (int): The number of available bytes on the volume + # + # NOTE: We use this stub to allow the test cases + # to override what an artifact cache thinks + # about it's disk size and available bytes. + # + def _get_cache_volume_size(self): + return utils._get_volume_size(self.context.artifactdir) # _configured_remote_artifact_cache_specs(): diff --git a/buildstream/_context.py b/buildstream/_context.py index f14f6b746..29a016065 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -30,7 +30,7 @@ from . import _yaml from ._exceptions import LoadError, LoadErrorReason, BstError from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end -from ._artifactcache import ArtifactCache +from ._artifactcache import ArtifactCache, ArtifactCacheUsage from ._cas import CASCache from ._workspaces import Workspaces, WorkspaceProjectCache from .plugin import _plugin_lookup @@ -289,6 +289,16 @@ class Context(): return self._artifactcache + # get_artifact_cache_usage() + # + # Fetches the current usage of the artifact cache + # + # Returns: + # (ArtifactCacheUsage): The current status + # + def get_artifact_cache_usage(self): + return ArtifactCacheUsage(self.artifactcache) + # add_project(): # # Add a project to the context. diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index af38ae901..b6da079bd 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -194,11 +194,6 @@ class App(): except BstError as e: self._error_exit(e, "Error instantiating platform") - try: - self.context.artifactcache.preflight() - except BstError as e: - self._error_exit(e, "Error instantiating artifact cache") - # Create the logger right before setting the message handler self.logger = LogLine(self.context, self._content_profile, @@ -211,6 +206,13 @@ class App(): # Propagate pipeline feedback to the user self.context.set_message_handler(self._message_handler) + # Preflight the artifact cache after initializing logging, + # this can cause messages to be emitted. + try: + self.context.artifactcache.preflight() + except BstError as e: + self._error_exit(e, "Error instantiating artifact cache") + # # Load the Project # diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py index 2e7213e78..70f233357 100644 --- a/buildstream/_frontend/status.py +++ b/buildstream/_frontend/status.py @@ -353,13 +353,17 @@ class _StatusHeader(): def render(self, line_length, elapsed): project = self._context.get_toplevel_project() line_length = max(line_length, 80) - size = 0 - text = '' + # + # Line 1: Session time, project name, session / total elements + # + # ========= 00:00:00 project-name (143/387) ========= + # session = str(len(self._stream.session_elements)) total = str(len(self._stream.total_elements)) - # Format and calculate size for target and overall time code + size = 0 + text = '' size += len(total) + len(session) + 4 # Size for (N/N) with a leading space size += 8 # Size of time code size += len(project.name) + 1 @@ -372,6 +376,12 @@ class _StatusHeader(): self._format_profile.fmt(')') line1 = self._centered(text, size, line_length, '=') + + # + # Line 2: Dynamic list of queue status reports + # + # (Fetched:0 117 0)→ (Built:4 0 0) + # size = 0 text = '' @@ -389,10 +399,28 @@ class _StatusHeader(): line2 = self._centered(text, size, line_length, ' ') - size = 24 - text = self._format_profile.fmt("~~~~~ ") + \ - self._content_profile.fmt('Active Tasks') + \ - self._format_profile.fmt(" ~~~~~") + # + # Line 3: Cache usage percentage report + # + # ~~~~~~ cache: 69% ~~~~~~ + # + usage = self._context.get_artifact_cache_usage() + usage_percent = '{}%'.format(usage.used_percent) + + size = 21 + size += len(usage_percent) + if usage.used_percent >= 95: + formatted_usage_percent = self._error_profile.fmt(usage_percent) + elif usage.used_percent >= 80: + formatted_usage_percent = self._content_profile.fmt(usage_percent) + else: + formatted_usage_percent = self._success_profile.fmt(usage_percent) + + text = self._format_profile.fmt("~~~~~~ ") + \ + self._content_profile.fmt('cache') + \ + self._format_profile.fmt(': ') + \ + formatted_usage_percent + \ + self._format_profile.fmt(' ~~~~~~') line3 = self._centered(text, size, line_length, ' ') return line1 + '\n' + line2 + '\n' + line3 diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index c43856145..c5353bc43 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -452,6 +452,7 @@ class LogLine(Widget): values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S') values["Project"] = "{} ({})".format(project.name, project.directory) values["Targets"] = ", ".join([t.name for t in stream.targets]) + values["Cache Usage"] = "{}".format(context.get_artifact_cache_usage()) text += self._format_values(values) # User configurations diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index b378b3dab..a1d49f339 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -28,7 +28,20 @@ class CleanupJob(Job): self._artifacts = context.artifactcache def child_process(self): - return self._artifacts.clean() + def progress(): + self.send_message('update-cache-size', + self._artifacts.get_cache_size()) + return self._artifacts.clean(progress) + + def handle_message(self, message_type, message): + + # Update the cache size in the main process as we go, + # this provides better feedback in the UI. + if message_type == 'update-cache-size': + self._artifacts.set_cache_size(message) + return True + + return False def parent_complete(self, status, result): if status == JobStatus.OK: diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 7f14cf05f..e73150e7b 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -58,10 +58,10 @@ class JobStatus(): # Used to distinguish between status messages and return values -class Envelope(): +class _Envelope(): def __init__(self, message_type, message): - self._message_type = message_type - self._message = message + self.message_type = message_type + self.message = message # Process class that doesn't call waitpid on its own. @@ -275,10 +275,37 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # send_message() + # + # To be called from inside Job.child_process() implementations + # to send messages to the main process during processing. + # + # These messages will be processed by the class's Job.handle_message() + # implementation. + # + def send_message(self, message_type, message): + self._queue.put(_Envelope(message_type, message)) + ####################################################### # Abstract Methods # ####################################################### + # handle_message() + # + # Handle a custom message. This will be called in the main process in + # response to any messages sent to the main proces using the + # Job.send_message() API from inside a Job.child_process() implementation + # + # Args: + # message_type (str): A string to identify the message type + # message (any): A simple serializable object + # + # Returns: + # (bool): Should return a truthy value if message_type is handled. + # + def handle_message(self, message_type, message): + return False + # parent_complete() # # This will be executed after the job finishes, and is expected to @@ -416,7 +443,7 @@ class Job(): elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -442,7 +469,7 @@ class Job(): else: # No exception occurred in the action - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime @@ -469,7 +496,7 @@ class Job(): domain = e.domain reason = e.reason - envelope = Envelope('error', { + envelope = _Envelope('error', { 'domain': domain, 'reason': reason }) @@ -487,7 +514,7 @@ class Job(): # def _child_send_result(self, result): if result is not None: - envelope = Envelope('result', result) + envelope = _Envelope('result', result) self._queue.put(envelope) # _child_shutdown() @@ -524,7 +551,7 @@ class Job(): if message.message_type == MessageType.LOG: return - self._queue.put(Envelope('message', message)) + self._queue.put(_Envelope('message', message)) # _parent_shutdown() # @@ -588,24 +615,28 @@ class Job(): if not self._listening: return - if envelope._message_type == 'message': + if envelope.message_type == 'message': # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope._message) - elif envelope._message_type == 'error': + self._scheduler.context.message(envelope.message) + elif envelope.message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope._message['domain'], - envelope._message['reason']) - elif envelope._message_type == 'result': + set_last_task_error(envelope.message['domain'], + envelope.message['reason']) + elif envelope.message_type == 'result': assert self._result is None - self._result = envelope._message - elif envelope._message_type == 'child_data': + self._result = envelope.message + elif envelope.message_type == 'child_data': # If we retry a job, we assign a new value to this - self.child_data = envelope._message - else: - raise Exception() + self.child_data = envelope.message + + # Try Job subclass specific messages now + elif not self.handle_message(envelope.message_type, + envelope.message): + assert 0, "Unhandled message type '{}': {}" \ + .format(envelope.message_type, envelope.message) # _parent_process_queue() # diff --git a/buildstream/utils.py b/buildstream/utils.py index e3ff88034..c8d79c95a 100644 --- a/buildstream/utils.py +++ b/buildstream/utils.py @@ -633,6 +633,27 @@ def _get_dir_size(path): return get_size(path) +# _get_volume_size(): +# +# Gets the overall usage and total size of a mounted filesystem in bytes. +# +# Args: +# path (str): The path to check +# +# Returns: +# (int): The total number of bytes on the volume +# (int): The number of available bytes on the volume +# +def _get_volume_size(path): + try: + stat_ = os.statvfs(path) + except OSError as e: + raise UtilError("Failed to retrieve stats on volume for path '{}': {}" + .format(path, e)) from e + + return stat_.f_bsize * stat_.f_blocks, stat_.f_bsize * stat_.f_bavail + + # _parse_size(): # # Convert a string representing data size to a number of @@ -667,8 +688,7 @@ def _parse_size(size, volume): if num > 100: raise UtilError("{}% is not a valid percentage value.".format(num)) - stat_ = os.statvfs(volume) - disk_size = stat_.f_blocks * stat_.f_bsize + disk_size, _ = _get_volume_size(volume) return disk_size * (num / 100) diff --git a/tests/artifactcache/cache_size.py b/tests/artifactcache/cache_size.py index 11c8f6194..63ab9ad07 100644 --- a/tests/artifactcache/cache_size.py +++ b/tests/artifactcache/cache_size.py @@ -80,7 +80,7 @@ def test_quota_over_1024T(cli, tmpdir): _yaml.dump({'name': 'main'}, str(project.join("project.conf"))) volume_space_patch = mock.patch( - "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for", + "buildstream._artifactcache.ArtifactCache._get_cache_volume_size", autospec=True, return_value=(1025 * TiB, 1025 * TiB) ) diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py index d7bafe7e8..2230b70bd 100644 --- a/tests/artifactcache/expiry.py +++ b/tests/artifactcache/expiry.py @@ -358,9 +358,9 @@ def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, err_domain, err_reas total_space = 10000 volume_space_patch = mock.patch( - "buildstream._artifactcache.ArtifactCache._get_volume_space_info_for", + "buildstream._artifactcache.ArtifactCache._get_cache_volume_size", autospec=True, - return_value=(free_space, total_space), + return_value=(total_space, free_space), ) cache_size_patch = mock.patch( |