diff options
Diffstat (limited to 'src/buildstream/_cas/cascache.py')
-rw-r--r-- | src/buildstream/_cas/cascache.py | 1462 |
1 files changed, 1462 insertions, 0 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py new file mode 100644 index 000000000..ad8013d18 --- /dev/null +++ b/src/buildstream/_cas/cascache.py @@ -0,0 +1,1462 @@ +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +import hashlib +import itertools +import os +import stat +import errno +import uuid +import contextlib + +import grpc + +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 +from .._protos.buildstream.v2 import buildstream_pb2 + +from .. import utils +from .._exceptions import CASCacheError, LoadError, LoadErrorReason +from .._message import Message, MessageType + +from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate + +_BUFFER_SIZE = 65536 + + +CACHE_SIZE_FILE = "cache_size" + + +# CASCacheUsage +# +# A simple object to report the current CAS cache usage details. +# +# Note that this uses the user configured cache quota +# rather than the internal quota with protective headroom +# removed, to provide a more sensible value to display to +# the user. +# +# Args: +# cas (CASQuota): The CAS cache to get the status of +# +class CASCacheUsage(): + + def __init__(self, casquota): + self.quota_config = casquota._config_cache_quota # Configured quota + self.quota_size = casquota._cache_quota_original # Resolved cache quota in bytes + self.used_size = casquota.get_cache_size() # Size used by artifacts in bytes + self.used_percent = 0 # Percentage of the quota used + if self.quota_size is not None: + self.used_percent = int(self.used_size * 100 / self.quota_size) + + # Formattable into a human readable string + # + def __str__(self): + return "{} / {} ({}%)" \ + .format(utils._pretty_size(self.used_size, dec_places=1), + self.quota_config, + self.used_percent) + + +# A CASCache manages a CAS repository as specified in the Remote Execution API. +# +# Args: +# path (str): The root directory for the CAS repository +# cache_quota (int): User configured cache quota +# +class CASCache(): + + def __init__(self, path): + self.casdir = os.path.join(path, 'cas') + self.tmpdir = os.path.join(path, 'tmp') + os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) + os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) + + self.__reachable_directory_callbacks = [] + self.__reachable_digest_callbacks = [] + + # preflight(): + # + # Preflight check. + # + def preflight(self): + headdir = os.path.join(self.casdir, 'refs', 'heads') + objdir = os.path.join(self.casdir, 'objects') + if not (os.path.isdir(headdir) and os.path.isdir(objdir)): + raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir)) + + # contains(): + # + # Check whether the specified ref is already available in the local CAS cache. + # + # Args: + # ref (str): The ref to check + # + # Returns: True if the ref is in the cache, False otherwise + # + def contains(self, ref): + refpath = self._refpath(ref) + + # This assumes that the repository doesn't have any dangling pointers + return os.path.exists(refpath) + + # contains_directory(): + # + # Check whether the specified directory and subdirecotires are in the cache, + # i.e non dangling. + # + # Args: + # digest (Digest): The directory digest to check + # with_files (bool): Whether to check files as well + # + # Returns: True if the directory is available in the local cache + # + def contains_directory(self, digest, *, with_files): + try: + directory = remote_execution_pb2.Directory() + with open(self.objpath(digest), 'rb') as f: + directory.ParseFromString(f.read()) + + # Optionally check presence of files + if with_files: + for filenode in directory.files: + if not os.path.exists(self.objpath(filenode.digest)): + return False + + # Check subdirectories + for dirnode in directory.directories: + if not self.contains_directory(dirnode.digest, with_files=with_files): + return False + + return True + except FileNotFoundError: + return False + + # checkout(): + # + # Checkout the specified directory digest. + # + # Args: + # dest (str): The destination path + # tree (Digest): The directory digest to extract + # can_link (bool): Whether we can create hard links in the destination + # + def checkout(self, dest, tree, *, can_link=False): + os.makedirs(dest, exist_ok=True) + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(tree), 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + # regular file, create hardlink + fullpath = os.path.join(dest, filenode.name) + if can_link: + utils.safe_link(self.objpath(filenode.digest), fullpath) + else: + utils.safe_copy(self.objpath(filenode.digest), fullpath) + + if filenode.is_executable: + os.chmod(fullpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | + stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) + + for dirnode in directory.directories: + fullpath = os.path.join(dest, dirnode.name) + self.checkout(fullpath, dirnode.digest, can_link=can_link) + + for symlinknode in directory.symlinks: + # symlink + fullpath = os.path.join(dest, symlinknode.name) + os.symlink(symlinknode.target, fullpath) + + # commit(): + # + # Commit directory to cache. + # + # Args: + # refs (list): The refs to set + # path (str): The directory to import + # + def commit(self, refs, path): + tree = self._commit_directory(path) + + for ref in refs: + self.set_ref(ref, tree) + + # diff(): + # + # Return a list of files that have been added or modified between + # the refs described by ref_a and ref_b. + # + # Args: + # ref_a (str): The first ref + # ref_b (str): The second ref + # subdir (str): A subdirectory to limit the comparison to + # + def diff(self, ref_a, ref_b): + tree_a = self.resolve_ref(ref_a) + tree_b = self.resolve_ref(ref_b) + + added = [] + removed = [] + modified = [] + + self.diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified) + + return modified, removed, added + + # pull(): + # + # Pull a ref from a remote repository. + # + # Args: + # ref (str): The ref to pull + # remote (CASRemote): The remote repository to pull from + # + # Returns: + # (bool): True if pull was successful, False if ref was not available + # + def pull(self, ref, remote): + try: + remote.init() + + request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) + request.key = ref + response = remote.ref_storage.GetReference(request) + + tree = response.digest + + # Fetch Directory objects + self._fetch_directory(remote, tree) + + # Fetch files, excluded_subdirs determined in pullqueue + required_blobs = self.required_blobs_for_directory(tree) + missing_blobs = self.local_missing_blobs(required_blobs) + if missing_blobs: + self.fetch_blobs(remote, missing_blobs) + + self.set_ref(ref, tree) + + return True + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e + else: + return False + except BlobNotFound as e: + return False + + # pull_tree(): + # + # Pull a single Tree rather than a ref. + # Does not update local refs. + # + # Args: + # remote (CASRemote): The remote to pull from + # digest (Digest): The digest of the tree + # + def pull_tree(self, remote, digest): + try: + remote.init() + + digest = self._fetch_tree(remote, digest) + + return digest + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise + + return None + + # link_ref(): + # + # Add an alias for an existing ref. + # + # Args: + # oldref (str): An existing ref + # newref (str): A new ref for the same directory + # + def link_ref(self, oldref, newref): + tree = self.resolve_ref(oldref) + + self.set_ref(newref, tree) + + # push(): + # + # Push committed refs to remote repository. + # + # Args: + # refs (list): The refs to push + # remote (CASRemote): The remote to push to + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (CASCacheError): if there was an error + # + def push(self, refs, remote): + skipped_remote = True + try: + for ref in refs: + tree = self.resolve_ref(ref) + + # Check whether ref is already on the server in which case + # there is no need to push the ref + try: + request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) + request.key = ref + response = remote.ref_storage.GetReference(request) + + if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: + # ref is already on the server with the same tree + continue + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + # Intentionally re-raise RpcError for outer except block. + raise + + self._send_directory(remote, tree) + + request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name) + request.keys.append(ref) + request.digest.CopyFrom(tree) + remote.ref_storage.UpdateReference(request) + + skipped_remote = False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e + + return not skipped_remote + + # objpath(): + # + # Return the path of an object based on its digest. + # + # Args: + # digest (Digest): The digest of the object + # + # Returns: + # (str): The path of the object + # + def objpath(self, digest): + return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:]) + + # add_object(): + # + # Hash and write object to CAS. + # + # Args: + # digest (Digest): An optional Digest object to populate + # path (str): Path to file to add + # buffer (bytes): Byte buffer to add + # link_directly (bool): Whether file given by path can be linked + # + # Returns: + # (Digest): The digest of the added object + # + # Either `path` or `buffer` must be passed, but not both. + # + def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False): + # Exactly one of the two parameters has to be specified + assert (path is None) != (buffer is None) + + if digest is None: + digest = remote_execution_pb2.Digest() + + try: + h = hashlib.sha256() + # Always write out new file to avoid corruption if input file is modified + with contextlib.ExitStack() as stack: + if path is not None and link_directly: + tmp = stack.enter_context(open(path, 'rb')) + for chunk in iter(lambda: tmp.read(_BUFFER_SIZE), b""): + h.update(chunk) + else: + tmp = stack.enter_context(self._temporary_object()) + + if path: + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(_BUFFER_SIZE), b""): + h.update(chunk) + tmp.write(chunk) + else: + h.update(buffer) + tmp.write(buffer) + + tmp.flush() + + digest.hash = h.hexdigest() + digest.size_bytes = os.fstat(tmp.fileno()).st_size + + # Place file at final location + objpath = self.objpath(digest) + os.makedirs(os.path.dirname(objpath), exist_ok=True) + os.link(tmp.name, objpath) + + except FileExistsError as e: + # We can ignore the failed link() if the object is already in the repo. + pass + + except OSError as e: + raise CASCacheError("Failed to hash object: {}".format(e)) from e + + return digest + + # set_ref(): + # + # Create or replace a ref. + # + # Args: + # ref (str): The name of the ref + # + def set_ref(self, ref, tree): + refpath = self._refpath(ref) + os.makedirs(os.path.dirname(refpath), exist_ok=True) + with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f: + f.write(tree.SerializeToString()) + + # resolve_ref(): + # + # Resolve a ref to a digest. + # + # Args: + # ref (str): The name of the ref + # update_mtime (bool): Whether to update the mtime of the ref + # + # Returns: + # (Digest): The digest stored in the ref + # + def resolve_ref(self, ref, *, update_mtime=False): + refpath = self._refpath(ref) + + try: + with open(refpath, 'rb') as f: + if update_mtime: + os.utime(refpath) + + digest = remote_execution_pb2.Digest() + digest.ParseFromString(f.read()) + return digest + + except FileNotFoundError as e: + raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e + + # update_mtime() + # + # Update the mtime of a ref. + # + # Args: + # ref (str): The ref to update + # + def update_mtime(self, ref): + try: + os.utime(self._refpath(ref)) + except FileNotFoundError as e: + raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e + + # list_objects(): + # + # List cached objects in Least Recently Modified (LRM) order. + # + # Returns: + # (list) - A list of objects and timestamps in LRM order + # + def list_objects(self): + objs = [] + mtimes = [] + + for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): + for filename in files: + obj_path = os.path.join(root, filename) + try: + mtimes.append(os.path.getmtime(obj_path)) + except FileNotFoundError: + pass + else: + objs.append(obj_path) + + # NOTE: Sorted will sort from earliest to latest, thus the + # first element of this list will be the file modified earliest. + return sorted(zip(mtimes, objs)) + + def clean_up_refs_until(self, time): + ref_heads = os.path.join(self.casdir, 'refs', 'heads') + + for root, _, files in os.walk(ref_heads): + for filename in files: + ref_path = os.path.join(root, filename) + # Obtain the mtime (the time a file was last modified) + if os.path.getmtime(ref_path) < time: + os.unlink(ref_path) + + # remove(): + # + # Removes the given symbolic ref from the repo. + # + # Args: + # ref (str): A symbolic ref + # basedir (str): Path of base directory the ref is in, defaults to + # CAS refs heads + # defer_prune (bool): Whether to defer pruning to the caller. NOTE: + # The space won't be freed until you manually + # call prune. + # + # Returns: + # (int|None) The amount of space pruned from the repository in + # Bytes, or None if defer_prune is True + # + def remove(self, ref, *, basedir=None, defer_prune=False): + + if basedir is None: + basedir = os.path.join(self.casdir, 'refs', 'heads') + # Remove cache ref + self._remove_ref(ref, basedir) + + if not defer_prune: + pruned = self.prune() + return pruned + + return None + + # adds callback of iterator over reachable directory digests + def add_reachable_directories_callback(self, callback): + self.__reachable_directory_callbacks.append(callback) + + # adds callbacks of iterator over reachable file digests + def add_reachable_digests_callback(self, callback): + self.__reachable_digest_callbacks.append(callback) + + # prune(): + # + # Prune unreachable objects from the repo. + # + def prune(self): + ref_heads = os.path.join(self.casdir, 'refs', 'heads') + + pruned = 0 + reachable = set() + + # Check which objects are reachable + for root, _, files in os.walk(ref_heads): + for filename in files: + ref_path = os.path.join(root, filename) + ref = os.path.relpath(ref_path, ref_heads) + + tree = self.resolve_ref(ref) + self._reachable_refs_dir(reachable, tree) + + # check callback directory digests that are reachable + for digest_callback in self.__reachable_directory_callbacks: + for digest in digest_callback(): + self._reachable_refs_dir(reachable, digest) + + # check callback file digests that are reachable + for digest_callback in self.__reachable_digest_callbacks: + for digest in digest_callback(): + reachable.add(digest.hash) + + # Prune unreachable objects + for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): + for filename in files: + objhash = os.path.basename(root) + filename + if objhash not in reachable: + obj_path = os.path.join(root, filename) + pruned += os.stat(obj_path).st_size + os.unlink(obj_path) + + return pruned + + def update_tree_mtime(self, tree): + reachable = set() + self._reachable_refs_dir(reachable, tree, update_mtime=True) + + # remote_missing_blobs_for_directory(): + # + # Determine which blobs of a directory tree are missing on the remote. + # + # Args: + # digest (Digest): The directory digest + # + # Returns: List of missing Digest objects + # + def remote_missing_blobs_for_directory(self, remote, digest): + required_blobs = self.required_blobs_for_directory(digest) + + return self.remote_missing_blobs(remote, required_blobs) + + # remote_missing_blobs(): + # + # Determine which blobs are missing on the remote. + # + # Args: + # blobs (Digest): The directory digest + # + # Returns: List of missing Digest objects + # + def remote_missing_blobs(self, remote, blobs): + missing_blobs = dict() + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.CopyFrom(required_digest) + + response = remote.cas.FindMissingBlobs(request) + for missing_digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.CopyFrom(missing_digest) + missing_blobs[d.hash] = d + + return missing_blobs.values() + + # local_missing_blobs(): + # + # Check local cache for missing blobs. + # + # Args: + # digests (list): The Digests of blobs to check + # + # Returns: Missing Digest objects + # + def local_missing_blobs(self, digests): + missing_blobs = [] + for digest in digests: + objpath = self.objpath(digest) + if not os.path.exists(objpath): + missing_blobs.append(digest) + return missing_blobs + + # required_blobs_for_directory(): + # + # Generator that returns the Digests of all blobs in the tree specified by + # the Digest of the toplevel Directory object. + # + def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None): + if not excluded_subdirs: + excluded_subdirs = [] + + # parse directory, and recursively add blobs + + yield directory_digest + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(directory_digest), 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + yield filenode.digest + + for dirnode in directory.directories: + if dirnode.name not in excluded_subdirs: + yield from self.required_blobs_for_directory(dirnode.digest) + + def diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): + dir_a = remote_execution_pb2.Directory() + dir_b = remote_execution_pb2.Directory() + + if tree_a: + with open(self.objpath(tree_a), 'rb') as f: + dir_a.ParseFromString(f.read()) + if tree_b: + with open(self.objpath(tree_b), 'rb') as f: + dir_b.ParseFromString(f.read()) + + a = 0 + b = 0 + while a < len(dir_a.files) or b < len(dir_b.files): + if b < len(dir_b.files) and (a >= len(dir_a.files) or + dir_a.files[a].name > dir_b.files[b].name): + added.append(os.path.join(path, dir_b.files[b].name)) + b += 1 + elif a < len(dir_a.files) and (b >= len(dir_b.files) or + dir_b.files[b].name > dir_a.files[a].name): + removed.append(os.path.join(path, dir_a.files[a].name)) + a += 1 + else: + # File exists in both directories + if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash: + modified.append(os.path.join(path, dir_a.files[a].name)) + a += 1 + b += 1 + + a = 0 + b = 0 + while a < len(dir_a.directories) or b < len(dir_b.directories): + if b < len(dir_b.directories) and (a >= len(dir_a.directories) or + dir_a.directories[a].name > dir_b.directories[b].name): + self.diff_trees(None, dir_b.directories[b].digest, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_b.directories[b].name)) + b += 1 + elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or + dir_b.directories[b].name > dir_a.directories[a].name): + self.diff_trees(dir_a.directories[a].digest, None, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_a.directories[a].name)) + a += 1 + else: + # Subdirectory exists in both directories + if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash: + self.diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_a.directories[a].name)) + a += 1 + b += 1 + + ################################################ + # Local Private Methods # + ################################################ + + def _refpath(self, ref): + return os.path.join(self.casdir, 'refs', 'heads', ref) + + # _remove_ref() + # + # Removes a ref. + # + # This also takes care of pruning away directories which can + # be removed after having removed the given ref. + # + # Args: + # ref (str): The ref to remove + # basedir (str): Path of base directory the ref is in + # + # Raises: + # (CASCacheError): If the ref didnt exist, or a system error + # occurred while removing it + # + def _remove_ref(self, ref, basedir): + + # Remove the ref itself + refpath = os.path.join(basedir, ref) + + try: + os.unlink(refpath) + except FileNotFoundError as e: + raise CASCacheError("Could not find ref '{}'".format(ref)) from e + + # Now remove any leading directories + + components = list(os.path.split(ref)) + while components: + components.pop() + refdir = os.path.join(basedir, *components) + + # Break out once we reach the base + if refdir == basedir: + break + + try: + os.rmdir(refdir) + except FileNotFoundError: + # The parent directory did not exist, but it's + # parent directory might still be ready to prune + pass + except OSError as e: + if e.errno == errno.ENOTEMPTY: + # The parent directory was not empty, so we + # cannot prune directories beyond this point + break + + # Something went wrong here + raise CASCacheError("System error while removing ref '{}': {}".format(ref, e)) from e + + # _commit_directory(): + # + # Adds local directory to content addressable store. + # + # Adds files, symbolic links and recursively other directories in + # a local directory to the content addressable store. + # + # Args: + # path (str): Path to the directory to add. + # dir_digest (Digest): An optional Digest object to use. + # + # Returns: + # (Digest): Digest object for the directory added. + # + def _commit_directory(self, path, *, dir_digest=None): + directory = remote_execution_pb2.Directory() + + for name in sorted(os.listdir(path)): + full_path = os.path.join(path, name) + mode = os.lstat(full_path).st_mode + if stat.S_ISDIR(mode): + dirnode = directory.directories.add() + dirnode.name = name + self._commit_directory(full_path, dir_digest=dirnode.digest) + elif stat.S_ISREG(mode): + filenode = directory.files.add() + filenode.name = name + self.add_object(path=full_path, digest=filenode.digest) + filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR + elif stat.S_ISLNK(mode): + symlinknode = directory.symlinks.add() + symlinknode.name = name + symlinknode.target = os.readlink(full_path) + elif stat.S_ISSOCK(mode): + # The process serving the socket can't be cached anyway + pass + else: + raise CASCacheError("Unsupported file type for {}".format(full_path)) + + return self.add_object(digest=dir_digest, + buffer=directory.SerializeToString()) + + def _get_subdir(self, tree, subdir): + head, name = os.path.split(subdir) + if head: + tree = self._get_subdir(tree, head) + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(tree), 'rb') as f: + directory.ParseFromString(f.read()) + + for dirnode in directory.directories: + if dirnode.name == name: + return dirnode.digest + + raise CASCacheError("Subdirectory {} not found".format(name)) + + def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False): + if tree.hash in reachable: + return + try: + if update_mtime: + os.utime(self.objpath(tree)) + + reachable.add(tree.hash) + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(tree), 'rb') as f: + directory.ParseFromString(f.read()) + + except FileNotFoundError: + # Just exit early if the file doesn't exist + return + + for filenode in directory.files: + if update_mtime: + os.utime(self.objpath(filenode.digest)) + if check_exists: + if not os.path.exists(self.objpath(filenode.digest)): + raise FileNotFoundError + reachable.add(filenode.digest.hash) + + for dirnode in directory.directories: + self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists) + + # _temporary_object(): + # + # Returns: + # (file): A file object to a named temporary file. + # + # Create a named temporary file with 0o0644 access rights. + @contextlib.contextmanager + def _temporary_object(self): + with utils._tempnamedfile(dir=self.tmpdir) as f: + os.chmod(f.name, + stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) + yield f + + # _ensure_blob(): + # + # Fetch and add blob if it's not already local. + # + # Args: + # remote (Remote): The remote to use. + # digest (Digest): Digest object for the blob to fetch. + # + # Returns: + # (str): The path of the object + # + def _ensure_blob(self, remote, digest): + objpath = self.objpath(digest) + if os.path.exists(objpath): + # already in local repository + return objpath + + with self._temporary_object() as f: + remote._fetch_blob(digest, f) + + added_digest = self.add_object(path=f.name, link_directly=True) + assert added_digest.hash == digest.hash + + return objpath + + def _batch_download_complete(self, batch, *, missing_blobs=None): + for digest, data in batch.send(missing_blobs=missing_blobs): + with self._temporary_object() as f: + f.write(data) + f.flush() + + added_digest = self.add_object(path=f.name, link_directly=True) + assert added_digest.hash == digest.hash + + # Helper function for _fetch_directory(). + def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): + self._batch_download_complete(batch) + + # All previously scheduled directories are now locally available, + # move them to the processing queue. + fetch_queue.extend(fetch_next_queue) + fetch_next_queue.clear() + return _CASBatchRead(remote) + + # Helper function for _fetch_directory(). + def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False): + in_local_cache = os.path.exists(self.objpath(digest)) + + if in_local_cache: + # Skip download, already in local cache. + pass + elif (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_read_supported): + # Too large for batch request, download in independent request. + self._ensure_blob(remote, digest) + in_local_cache = True + else: + if not batch.add(digest): + # Not enough space left in batch request. + # Complete pending batch first. + batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + batch.add(digest) + + if recursive: + if in_local_cache: + # Add directory to processing queue. + fetch_queue.append(digest) + else: + # Directory will be available after completing pending batch. + # Add directory to deferred processing queue. + fetch_next_queue.append(digest) + + return batch + + # _fetch_directory(): + # + # Fetches remote directory and adds it to content addressable store. + # + # This recursively fetches directory objects but doesn't fetch any + # files. + # + # Args: + # remote (Remote): The remote to use. + # dir_digest (Digest): Digest object for the directory to fetch. + # + def _fetch_directory(self, remote, dir_digest): + # TODO Use GetTree() if the server supports it + + fetch_queue = [dir_digest] + fetch_next_queue = [] + batch = _CASBatchRead(remote) + + while len(fetch_queue) + len(fetch_next_queue) > 0: + if not fetch_queue: + batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + + dir_digest = fetch_queue.pop(0) + + objpath = self._ensure_blob(remote, dir_digest) + + directory = remote_execution_pb2.Directory() + with open(objpath, 'rb') as f: + directory.ParseFromString(f.read()) + + for dirnode in directory.directories: + batch = self._fetch_directory_node(remote, dirnode.digest, batch, + fetch_queue, fetch_next_queue, recursive=True) + + # Fetch final batch + self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + + def _fetch_tree(self, remote, digest): + # download but do not store the Tree object + with utils._tempnamedfile(dir=self.tmpdir) as out: + remote._fetch_blob(digest, out) + + tree = remote_execution_pb2.Tree() + + with open(out.name, 'rb') as f: + tree.ParseFromString(f.read()) + + tree.children.extend([tree.root]) + for directory in tree.children: + dirbuffer = directory.SerializeToString() + dirdigest = self.add_object(buffer=dirbuffer) + assert dirdigest.size_bytes == len(dirbuffer) + + return dirdigest + + # fetch_blobs(): + # + # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched. + # + # Args: + # remote (CASRemote): The remote repository to fetch from + # digests (list): The Digests of blobs to fetch + # + # Returns: The Digests of the blobs that were not available on the remote CAS + # + def fetch_blobs(self, remote, digests): + missing_blobs = [] + + batch = _CASBatchRead(remote) + + for digest in digests: + if (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_read_supported): + # Too large for batch request, download in independent request. + try: + self._ensure_blob(remote, digest) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + missing_blobs.append(digest) + else: + raise CASCacheError("Failed to fetch blob: {}".format(e)) from e + else: + if not batch.add(digest): + # Not enough space left in batch request. + # Complete pending batch first. + self._batch_download_complete(batch, missing_blobs=missing_blobs) + + batch = _CASBatchRead(remote) + batch.add(digest) + + # Complete last pending batch + self._batch_download_complete(batch, missing_blobs=missing_blobs) + + return missing_blobs + + # send_blobs(): + # + # Upload blobs to remote CAS. + # + # Args: + # remote (CASRemote): The remote repository to upload to + # digests (list): The Digests of Blobs to upload + # + def send_blobs(self, remote, digests, u_uid=uuid.uuid4()): + batch = _CASBatchUpdate(remote) + + for digest in digests: + with open(self.objpath(digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == digest.size_bytes + + if (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_update_supported): + # Too large for batch request, upload in independent request. + remote._send_blob(digest, f, u_uid=u_uid) + else: + if not batch.add(digest, f): + # Not enough space left in batch request. + # Complete pending batch first. + batch.send() + batch = _CASBatchUpdate(remote) + batch.add(digest, f) + + # Send final batch + batch.send() + + def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + missing_blobs = self.remote_missing_blobs_for_directory(remote, digest) + + # Upload any blobs missing on the server + self.send_blobs(remote, missing_blobs, u_uid) + + +class CASQuota: + def __init__(self, context): + self.context = context + self.cas = context.get_cascache() + self.casdir = self.cas.casdir + self._config_cache_quota = context.config_cache_quota + self._config_cache_quota_string = context.config_cache_quota_string + self._cache_size = None # The current cache size, sometimes it's an estimate + self._cache_quota = None # The cache quota + self._cache_quota_original = None # The cache quota as specified by the user, in bytes + self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk + self._cache_lower_threshold = None # The target cache size for a cleanup + self.available_space = None + + self._message = context.message + + self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method + self._list_refs_callbacks = [] # Callbacks to all refs + + self._calculate_cache_quota() + + # compute_cache_size() + # + # Computes the real artifact cache size. + # + # Returns: + # (int): The size of the artifact cache. + # + def compute_cache_size(self): + self._cache_size = utils._get_dir_size(self.casdir) + return self._cache_size + + # get_cache_size() + # + # Fetches the cached size of the cache, this is sometimes + # an estimate and periodically adjusted to the real size + # when a cache size calculation job runs. + # + # When it is an estimate, the value is either correct, or + # it is greater than the actual cache size. + # + # Returns: + # (int) An approximation of the artifact cache size, in bytes. + # + def get_cache_size(self): + + # If we don't currently have an estimate, figure out the real cache size. + if self._cache_size is None: + stored_size = self._read_cache_size() + if stored_size is not None: + self._cache_size = stored_size + else: + self.compute_cache_size() + + return self._cache_size + + # set_cache_size() + # + # Forcefully set the overall cache size. + # + # This is used to update the size in the main process after + # having calculated in a cleanup or a cache size calculation job. + # + # Args: + # cache_size (int): The size to set. + # write_to_disk (bool): Whether to write the value to disk. + # + def set_cache_size(self, cache_size, *, write_to_disk=True): + + assert cache_size is not None + + self._cache_size = cache_size + if write_to_disk: + self._write_cache_size(self._cache_size) + + # full() + # + # Checks if the artifact cache is full, either + # because the user configured quota has been exceeded + # or because the underlying disk is almost full. + # + # Returns: + # (bool): True if the artifact cache is full + # + def full(self): + + if self.get_cache_size() > self._cache_quota: + return True + + _, volume_avail = self._get_cache_volume_size() + if volume_avail < self._cache_quota_headroom: + return True + + return False + + # add_remove_callbacks() + # + # This adds tuples of iterators over unrequired objects (currently + # artifacts and source refs), and a callback to remove them. + # + # Args: + # callback (iter(unrequired), remove): tuple of iterator and remove + # method associated. + # + def add_remove_callbacks(self, list_unrequired, remove_method): + self._remove_callbacks.append((list_unrequired, remove_method)) + + def add_list_refs_callback(self, list_callback): + self._list_refs_callbacks.append(list_callback) + + ################################################ + # Local Private Methods # + ################################################ + + # _read_cache_size() + # + # Reads and returns the size of the artifact cache that's stored in the + # cache's size file + # + # Returns: + # (int): The size of the artifact cache, as recorded in the file + # + def _read_cache_size(self): + size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) + + if not os.path.exists(size_file_path): + return None + + with open(size_file_path, "r") as f: + size = f.read() + + try: + num_size = int(size) + except ValueError as e: + raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format( + size, size_file_path)) from e + + return num_size + + # _write_cache_size() + # + # Writes the given size of the artifact to the cache's size file + # + # Args: + # size (int): The size of the artifact cache to record + # + def _write_cache_size(self, size): + assert isinstance(size, int) + size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE) + with utils.save_file_atomic(size_file_path, "w", tempdir=self.cas.tmpdir) as f: + f.write(str(size)) + + # _get_cache_volume_size() + # + # Get the available space and total space for the volume on + # which the artifact cache is located. + # + # Returns: + # (int): The total number of bytes on the volume + # (int): The number of available bytes on the volume + # + # NOTE: We use this stub to allow the test cases + # to override what an artifact cache thinks + # about it's disk size and available bytes. + # + def _get_cache_volume_size(self): + return utils._get_volume_size(self.casdir) + + # _calculate_cache_quota() + # + # Calculates and sets the cache quota and lower threshold based on the + # quota set in Context. + # It checks that the quota is both a valid expression, and that there is + # enough disk space to satisfy that quota + # + def _calculate_cache_quota(self): + # Headroom intended to give BuildStream a bit of leeway. + # This acts as the minimum size of cache_quota and also + # is taken from the user requested cache_quota. + # + if 'BST_TEST_SUITE' in os.environ: + self._cache_quota_headroom = 0 + else: + self._cache_quota_headroom = 2e9 + + total_size, available_space = self._get_cache_volume_size() + cache_size = self.get_cache_size() + self.available_space = available_space + + # Ensure system has enough storage for the cache_quota + # + # If cache_quota is none, set it to the maximum it could possibly be. + # + # Also check that cache_quota is at least as large as our headroom. + # + cache_quota = self._config_cache_quota + if cache_quota is None: + # The user has set no limit, so we may take all the space. + cache_quota = min(cache_size + available_space, total_size) + if cache_quota < self._cache_quota_headroom: # Check minimum + raise LoadError( + LoadErrorReason.INVALID_DATA, + "Invalid cache quota ({}): BuildStream requires a minimum cache quota of {}.".format( + utils._pretty_size(cache_quota), + utils._pretty_size(self._cache_quota_headroom))) + elif cache_quota > total_size: + # A quota greater than the total disk size is certianly an error + raise CASCacheError("Your system does not have enough available " + + "space to support the cache quota specified.", + detail=("You have specified a quota of {quota} total disk space.\n" + + "The filesystem containing {local_cache_path} only " + + "has {total_size} total disk space.") + .format( + quota=self._config_cache_quota, + local_cache_path=self.casdir, + total_size=utils._pretty_size(total_size)), + reason='insufficient-storage-for-quota') + + elif cache_quota > cache_size + available_space: + # The quota does not fit in the available space, this is a warning + if '%' in self._config_cache_quota_string: + available = (available_space / total_size) * 100 + available = '{}% of total disk space'.format(round(available, 1)) + else: + available = utils._pretty_size(available_space) + + self._message(Message( + None, + MessageType.WARN, + "Your system does not have enough available " + + "space to support the cache quota specified.", + detail=("You have specified a quota of {quota} total disk space.\n" + + "The filesystem containing {local_cache_path} only " + + "has {available_size} available.") + .format(quota=self._config_cache_quota, + local_cache_path=self.casdir, + available_size=available))) + + # Place a slight headroom (2e9 (2GB) on the cache_quota) into + # cache_quota to try and avoid exceptions. + # + # Of course, we might still end up running out during a build + # if we end up writing more than 2G, but hey, this stuff is + # already really fuzzy. + # + self._cache_quota_original = cache_quota + self._cache_quota = cache_quota - self._cache_quota_headroom + self._cache_lower_threshold = self._cache_quota / 2 + + # clean(): + # + # Clean the artifact cache as much as possible. + # + # Args: + # progress (callable): A callback to call when a ref is removed + # + # Returns: + # (int): The size of the cache after having cleaned up + # + def clean(self, progress=None): + context = self.context + + # Some accumulative statistics + removed_ref_count = 0 + space_saved = 0 + + total_refs = 0 + for refs in self._list_refs_callbacks: + total_refs += len(list(refs())) + + # Start off with an announcement with as much info as possible + volume_size, volume_avail = self._get_cache_volume_size() + self._message(Message( + None, MessageType.STATUS, "Starting cache cleanup", + detail=("Elements required by the current build plan:\n" + "{}\n" + + "User specified quota: {} ({})\n" + + "Cache usage: {}\n" + + "Cache volume: {} total, {} available") + .format( + total_refs, + context.config_cache_quota, + utils._pretty_size(self._cache_quota, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + utils._pretty_size(volume_size, dec_places=2), + utils._pretty_size(volume_avail, dec_places=2)))) + + # Do a real computation of the cache size once, just in case + self.compute_cache_size() + usage = CASCacheUsage(self) + self._message(Message(None, MessageType.STATUS, + "Cache usage recomputed: {}".format(usage))) + + # Collect digests and their remove method + all_unrequired_refs = [] + for (unrequired_refs, remove) in self._remove_callbacks: + for (mtime, ref) in unrequired_refs(): + all_unrequired_refs.append((mtime, ref, remove)) + + # Pair refs and their remove method sorted in time order + all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)] + + # Go through unrequired refs and remove them, oldest first + made_space = False + for (ref, remove) in all_unrequired_refs: + size = remove(ref) + removed_ref_count += 1 + space_saved += size + + self._message(Message( + None, MessageType.STATUS, + "Freed {: <7} {}".format( + utils._pretty_size(size, dec_places=2), + ref))) + + self.set_cache_size(self._cache_size - size) + + # User callback + # + # Currently this process is fairly slow, but we should + # think about throttling this progress() callback if this + # becomes too intense. + if progress: + progress() + + if self.get_cache_size() < self._cache_lower_threshold: + made_space = True + break + + if not made_space and self.full(): + # If too many artifacts are required, and we therefore + # can't remove them, we have to abort the build. + # + # FIXME: Asking the user what to do may be neater + # + default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], + 'buildstream.conf') + detail = ("Aborted after removing {} refs and saving {} disk space.\n" + "The remaining {} in the cache is required by the {} references in your build plan\n\n" + "There is not enough space to complete the build.\n" + "Please increase the cache-quota in {} and/or make more disk space." + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + total_refs, + (context.config_origin or default_conf))) + + raise CASCacheError("Cache too full. Aborting.", + detail=detail, + reason="cache-too-full") + + # Informational message about the side effects of the cleanup + self._message(Message( + None, MessageType.INFO, "Cleanup completed", + detail=("Removed {} refs and saving {} disk space.\n" + + "Cache usage is now: {}") + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2)))) + + return self.get_cache_size() + + +def _grouper(iterable, n): + while True: + try: + current = next(iterable) + except StopIteration: + return + yield itertools.chain([current], itertools.islice(iterable, n - 1)) |