diff options
Diffstat (limited to 'src/buildstream/_cas/cascache.py')
-rw-r--r-- | src/buildstream/_cas/cascache.py | 391 |
1 files changed, 1 insertions, 390 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index ff16480b1..0e3b544fc 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -31,8 +31,7 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .._protos.buildstream.v2 import buildstream_pb2 from .. import utils -from .._exceptions import CASCacheError, LoadError, LoadErrorReason -from .._message import Message, MessageType +from .._exceptions import CASCacheError from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate @@ -42,37 +41,6 @@ _BUFFER_SIZE = 65536 CACHE_SIZE_FILE = "cache_size" -# CASCacheUsage -# -# A simple object to report the current CAS cache usage details. -# -# Note that this uses the user configured cache quota -# rather than the internal quota with protective headroom -# removed, to provide a more sensible value to display to -# the user. -# -# Args: -# cas (CASQuota): The CAS cache to get the status of -# -class CASCacheUsage(): - - def __init__(self, casquota): - self.quota_config = casquota._config_cache_quota # Configured quota - self.quota_size = casquota._cache_quota_original # Resolved cache quota in bytes - self.used_size = casquota.get_cache_size() # Size used by artifacts in bytes - self.used_percent = 0 # Percentage of the quota used - if self.quota_size is not None: - self.used_percent = int(self.used_size * 100 / self.quota_size) - - # Formattable into a human readable string - # - def __str__(self): - return "{} / {} ({}%)" \ - .format(utils._pretty_size(self.used_size, dec_places=1), - self.quota_config, - self.used_percent) - - # A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: @@ -1051,363 +1019,6 @@ class CASCache(): self.send_blobs(remote, missing_blobs, u_uid) -class CASQuota: - def __init__(self, context): - self.context = context - self.cas = context.get_cascache() - self.casdir = self.cas.casdir - self._config_cache_quota = context.config_cache_quota - self._config_cache_quota_string = context.config_cache_quota_string - self._cache_size = None # The current cache size, sometimes it's an estimate - self._cache_quota = None # The cache quota - self._cache_quota_original = None # The cache quota as specified by the user, in bytes - self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk - self._cache_lower_threshold = None # The target cache size for a cleanup - - self._message = context.messenger.message - - self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method - self._list_refs_callbacks = [] # Callbacks to all refs - - self._calculate_cache_quota() - - # compute_cache_size() - # - # Computes the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def compute_cache_size(self): - self._cache_size = utils._get_dir_size(self.casdir) - return self._cache_size - - # get_cache_size() - # - # Fetches the cached size of the cache, this is sometimes - # an estimate and periodically adjusted to the real size - # when a cache size calculation job runs. - # - # When it is an estimate, the value is either correct, or - # it is greater than the actual cache size. - # - # Returns: - # (int) An approximation of the artifact cache size, in bytes. - # - def get_cache_size(self): - - # If we don't currently have an estimate, figure out the real cache size. - if self._cache_size is None: - stored_size = self._read_cache_size() - if stored_size is not None: - self._cache_size = stored_size - else: - self.compute_cache_size() - - return self._cache_size - - # set_cache_size() - # - # Forcefully set the overall cache size. - # - # This is used to update the size in the main process after - # having calculated in a cleanup or a cache size calculation job. - # - # Args: - # cache_size (int): The size to set. - # write_to_disk (bool): Whether to write the value to disk. - # - def set_cache_size(self, cache_size, *, write_to_disk=True): - - assert cache_size is not None - - self._cache_size = cache_size - if write_to_disk: - self._write_cache_size(self._cache_size) - - # full() - # - # Checks if the artifact cache is full, either - # because the user configured quota has been exceeded - # or because the underlying disk is almost full. - # - # Returns: - # (bool): True if the artifact cache is full - # - def full(self): - - if self.get_cache_size() > self._cache_quota: - return True - - _, volume_avail = self._get_cache_volume_size() - if volume_avail < self._cache_quota_headroom: - return True - - return False - - # add_remove_callbacks() - # - # This adds tuples of iterators over unrequired objects (currently - # artifacts and source refs), and a callback to remove them. - # - # Args: - # callback (iter(unrequired), remove): tuple of iterator and remove - # method associated. - # - def add_remove_callbacks(self, list_unrequired, remove_method): - self._remove_callbacks.append((list_unrequired, remove_method)) - - def add_list_refs_callback(self, list_callback): - self._list_refs_callbacks.append(list_callback) - - ################################################ - # Local Private Methods # - ################################################ - - # _read_cache_size() - # - # Reads and returns the size of the artifact cache that's stored in the - # cache's size file - # - # Returns: - # (int): The size of the artifact cache, as recorded in the file - # - def _read_cache_size(self): - size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) - - if not os.path.exists(size_file_path): - return None - - with open(size_file_path, "r") as f: - size = f.read() - - try: - num_size = int(size) - except ValueError as e: - raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format( - size, size_file_path)) from e - - return num_size - - # _write_cache_size() - # - # Writes the given size of the artifact to the cache's size file - # - # Args: - # size (int): The size of the artifact cache to record - # - def _write_cache_size(self, size): - assert isinstance(size, int) - size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) - with utils.save_file_atomic(size_file_path, "w", tempdir=self.cas.tmpdir) as f: - f.write(str(size)) - - # _get_cache_volume_size() - # - # Get the available space and total space for the volume on - # which the artifact cache is located. - # - # Returns: - # (int): The total number of bytes on the volume - # (int): The number of available bytes on the volume - # - # NOTE: We use this stub to allow the test cases - # to override what an artifact cache thinks - # about it's disk size and available bytes. - # - def _get_cache_volume_size(self): - return utils._get_volume_size(self.casdir) - - # _calculate_cache_quota() - # - # Calculates and sets the cache quota and lower threshold based on the - # quota set in Context. - # It checks that the quota is both a valid expression, and that there is - # enough disk space to satisfy that quota - # - def _calculate_cache_quota(self): - # Headroom intended to give BuildStream a bit of leeway. - # This acts as the minimum size of cache_quota and also - # is taken from the user requested cache_quota. - # - if self.context.is_running_in_test_suite: - self._cache_quota_headroom = 0 - else: - self._cache_quota_headroom = 2e9 - - total_size, available_space = self._get_cache_volume_size() - cache_size = self.get_cache_size() - - # Ensure system has enough storage for the cache_quota - # - # If cache_quota is none, set it to the maximum it could possibly be. - # - # Also check that cache_quota is at least as large as our headroom. - # - cache_quota = self._config_cache_quota - if cache_quota is None: - # The user has set no limit, so we may take all the space. - cache_quota = min(cache_size + available_space, total_size) - if cache_quota < self._cache_quota_headroom: # Check minimum - raise LoadError("Invalid cache quota ({}): BuildStream requires a minimum cache quota of {}." - .format(utils._pretty_size(cache_quota), utils._pretty_size(self._cache_quota_headroom)), - LoadErrorReason.INVALID_DATA) - elif cache_quota > total_size: - # A quota greater than the total disk size is certianly an error - raise CASCacheError("Your system does not have enough available " + - "space to support the cache quota specified.", - detail=("You have specified a quota of {quota} total disk space.\n" + - "The filesystem containing {local_cache_path} only " + - "has {total_size} total disk space.") - .format( - quota=self._config_cache_quota, - local_cache_path=self.casdir, - total_size=utils._pretty_size(total_size)), - reason='insufficient-storage-for-quota') - - elif cache_quota > cache_size + available_space: - # The quota does not fit in the available space, this is a warning - if '%' in self._config_cache_quota_string: - available = (available_space / total_size) * 100 - available = '{}% of total disk space'.format(round(available, 1)) - else: - available = utils._pretty_size(available_space) - - self._message(Message( - MessageType.WARN, - "Your system does not have enough available " + - "space to support the cache quota specified.", - detail=("You have specified a quota of {quota} total disk space.\n" + - "The filesystem containing {local_cache_path} only " + - "has {available_size} available.") - .format(quota=self._config_cache_quota, - local_cache_path=self.casdir, - available_size=available))) - - # Place a slight headroom (2e9 (2GB) on the cache_quota) into - # cache_quota to try and avoid exceptions. - # - # Of course, we might still end up running out during a build - # if we end up writing more than 2G, but hey, this stuff is - # already really fuzzy. - # - self._cache_quota_original = cache_quota - self._cache_quota = cache_quota - self._cache_quota_headroom - self._cache_lower_threshold = self._cache_quota / 2 - - # clean(): - # - # Clean the artifact cache as much as possible. - # - # Args: - # progress (callable): A callback to call when a ref is removed - # - # Returns: - # (int): The size of the cache after having cleaned up - # - def clean(self, progress=None): - context = self.context - - # Some accumulative statistics - removed_ref_count = 0 - space_saved = 0 - - total_refs = 0 - for refs in self._list_refs_callbacks: - total_refs += len(list(refs())) - - # Start off with an announcement with as much info as possible - volume_size, volume_avail = self._get_cache_volume_size() - self._message(Message( - MessageType.STATUS, "Starting cache cleanup", - detail=("Elements required by the current build plan:\n" + "{}\n" + - "User specified quota: {} ({})\n" + - "Cache usage: {}\n" + - "Cache volume: {} total, {} available") - .format( - total_refs, - context.config_cache_quota, - utils._pretty_size(self._cache_quota, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2), - utils._pretty_size(volume_size, dec_places=2), - utils._pretty_size(volume_avail, dec_places=2)))) - - # Do a real computation of the cache size once, just in case - self.compute_cache_size() - usage = CASCacheUsage(self) - self._message(Message(MessageType.STATUS, - "Cache usage recomputed: {}".format(usage))) - - # Collect digests and their remove method - all_unrequired_refs = [] - for (unrequired_refs, remove) in self._remove_callbacks: - for (mtime, ref) in unrequired_refs(): - all_unrequired_refs.append((mtime, ref, remove)) - - # Pair refs and their remove method sorted in time order - all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)] - - # Go through unrequired refs and remove them, oldest first - made_space = False - for (ref, remove) in all_unrequired_refs: - size = remove(ref) - removed_ref_count += 1 - space_saved += size - - self._message(Message( - MessageType.STATUS, - "Freed {: <7} {}".format( - utils._pretty_size(size, dec_places=2), - ref))) - - self.set_cache_size(self._cache_size - size) - - # User callback - # - # Currently this process is fairly slow, but we should - # think about throttling this progress() callback if this - # becomes too intense. - if progress: - progress() - - if self.get_cache_size() < self._cache_lower_threshold: - made_space = True - break - - if not made_space and self.full(): - # If too many artifacts are required, and we therefore - # can't remove them, we have to abort the build. - # - # FIXME: Asking the user what to do may be neater - # - default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], - 'buildstream.conf') - detail = ("Aborted after removing {} refs and saving {} disk space.\n" - "The remaining {} in the cache is required by the {} references in your build plan\n\n" - "There is not enough space to complete the build.\n" - "Please increase the cache-quota in {} and/or make more disk space." - .format(removed_ref_count, - utils._pretty_size(space_saved, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2), - total_refs, - (context.config_origin or default_conf))) - - raise CASCacheError("Cache too full. Aborting.", - detail=detail, - reason="cache-too-full") - - # Informational message about the side effects of the cleanup - self._message(Message( - MessageType.INFO, "Cleanup completed", - detail=("Removed {} refs and saving {} disk space.\n" + - "Cache usage is now: {}") - .format(removed_ref_count, - utils._pretty_size(space_saved, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2)))) - - return self.get_cache_size() - - def _grouper(iterable, n): while True: try: |