summaryrefslogtreecommitdiff
path: root/src/buildstream/_cas/cascache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_cas/cascache.py')
-rw-r--r--src/buildstream/_cas/cascache.py1462
1 files changed, 1462 insertions, 0 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
new file mode 100644
index 000000000..ad8013d18
--- /dev/null
+++ b/src/buildstream/_cas/cascache.py
@@ -0,0 +1,1462 @@
+#
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+import hashlib
+import itertools
+import os
+import stat
+import errno
+import uuid
+import contextlib
+
+import grpc
+
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from .._protos.buildstream.v2 import buildstream_pb2
+
+from .. import utils
+from .._exceptions import CASCacheError, LoadError, LoadErrorReason
+from .._message import Message, MessageType
+
+from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
+
+_BUFFER_SIZE = 65536
+
+
+CACHE_SIZE_FILE = "cache_size"
+
+
+# CASCacheUsage
+#
+# A simple object to report the current CAS cache usage details.
+#
+# Note that this uses the user configured cache quota
+# rather than the internal quota with protective headroom
+# removed, to provide a more sensible value to display to
+# the user.
+#
+# Args:
+# cas (CASQuota): The CAS cache to get the status of
+#
+class CASCacheUsage():
+
+ def __init__(self, casquota):
+ self.quota_config = casquota._config_cache_quota # Configured quota
+ self.quota_size = casquota._cache_quota_original # Resolved cache quota in bytes
+ self.used_size = casquota.get_cache_size() # Size used by artifacts in bytes
+ self.used_percent = 0 # Percentage of the quota used
+ if self.quota_size is not None:
+ self.used_percent = int(self.used_size * 100 / self.quota_size)
+
+ # Formattable into a human readable string
+ #
+ def __str__(self):
+ return "{} / {} ({}%)" \
+ .format(utils._pretty_size(self.used_size, dec_places=1),
+ self.quota_config,
+ self.used_percent)
+
+
+# A CASCache manages a CAS repository as specified in the Remote Execution API.
+#
+# Args:
+# path (str): The root directory for the CAS repository
+# cache_quota (int): User configured cache quota
+#
+class CASCache():
+
+ def __init__(self, path):
+ self.casdir = os.path.join(path, 'cas')
+ self.tmpdir = os.path.join(path, 'tmp')
+ os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
+ os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
+ os.makedirs(self.tmpdir, exist_ok=True)
+
+ self.__reachable_directory_callbacks = []
+ self.__reachable_digest_callbacks = []
+
+ # preflight():
+ #
+ # Preflight check.
+ #
+ def preflight(self):
+ headdir = os.path.join(self.casdir, 'refs', 'heads')
+ objdir = os.path.join(self.casdir, 'objects')
+ if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
+ raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
+
+ # contains():
+ #
+ # Check whether the specified ref is already available in the local CAS cache.
+ #
+ # Args:
+ # ref (str): The ref to check
+ #
+ # Returns: True if the ref is in the cache, False otherwise
+ #
+ def contains(self, ref):
+ refpath = self._refpath(ref)
+
+ # This assumes that the repository doesn't have any dangling pointers
+ return os.path.exists(refpath)
+
+ # contains_directory():
+ #
+ # Check whether the specified directory and subdirecotires are in the cache,
+ # i.e non dangling.
+ #
+ # Args:
+ # digest (Digest): The directory digest to check
+ # with_files (bool): Whether to check files as well
+ #
+ # Returns: True if the directory is available in the local cache
+ #
+ def contains_directory(self, digest, *, with_files):
+ try:
+ directory = remote_execution_pb2.Directory()
+ with open(self.objpath(digest), 'rb') as f:
+ directory.ParseFromString(f.read())
+
+ # Optionally check presence of files
+ if with_files:
+ for filenode in directory.files:
+ if not os.path.exists(self.objpath(filenode.digest)):
+ return False
+
+ # Check subdirectories
+ for dirnode in directory.directories:
+ if not self.contains_directory(dirnode.digest, with_files=with_files):
+ return False
+
+ return True
+ except FileNotFoundError:
+ return False
+
+ # checkout():
+ #
+ # Checkout the specified directory digest.
+ #
+ # Args:
+ # dest (str): The destination path
+ # tree (Digest): The directory digest to extract
+ # can_link (bool): Whether we can create hard links in the destination
+ #
+ def checkout(self, dest, tree, *, can_link=False):
+ os.makedirs(dest, exist_ok=True)
+
+ directory = remote_execution_pb2.Directory()
+
+ with open(self.objpath(tree), 'rb') as f:
+ directory.ParseFromString(f.read())
+
+ for filenode in directory.files:
+ # regular file, create hardlink
+ fullpath = os.path.join(dest, filenode.name)
+ if can_link:
+ utils.safe_link(self.objpath(filenode.digest), fullpath)
+ else:
+ utils.safe_copy(self.objpath(filenode.digest), fullpath)
+
+ if filenode.is_executable:
+ os.chmod(fullpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
+ stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+
+ for dirnode in directory.directories:
+ fullpath = os.path.join(dest, dirnode.name)
+ self.checkout(fullpath, dirnode.digest, can_link=can_link)
+
+ for symlinknode in directory.symlinks:
+ # symlink
+ fullpath = os.path.join(dest, symlinknode.name)
+ os.symlink(symlinknode.target, fullpath)
+
+ # commit():
+ #
+ # Commit directory to cache.
+ #
+ # Args:
+ # refs (list): The refs to set
+ # path (str): The directory to import
+ #
+ def commit(self, refs, path):
+ tree = self._commit_directory(path)
+
+ for ref in refs:
+ self.set_ref(ref, tree)
+
+ # diff():
+ #
+ # Return a list of files that have been added or modified between
+ # the refs described by ref_a and ref_b.
+ #
+ # Args:
+ # ref_a (str): The first ref
+ # ref_b (str): The second ref
+ # subdir (str): A subdirectory to limit the comparison to
+ #
+ def diff(self, ref_a, ref_b):
+ tree_a = self.resolve_ref(ref_a)
+ tree_b = self.resolve_ref(ref_b)
+
+ added = []
+ removed = []
+ modified = []
+
+ self.diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified)
+
+ return modified, removed, added
+
+ # pull():
+ #
+ # Pull a ref from a remote repository.
+ #
+ # Args:
+ # ref (str): The ref to pull
+ # remote (CASRemote): The remote repository to pull from
+ #
+ # Returns:
+ # (bool): True if pull was successful, False if ref was not available
+ #
+ def pull(self, ref, remote):
+ try:
+ remote.init()
+
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
+ request.key = ref
+ response = remote.ref_storage.GetReference(request)
+
+ tree = response.digest
+
+ # Fetch Directory objects
+ self._fetch_directory(remote, tree)
+
+ # Fetch files, excluded_subdirs determined in pullqueue
+ required_blobs = self.required_blobs_for_directory(tree)
+ missing_blobs = self.local_missing_blobs(required_blobs)
+ if missing_blobs:
+ self.fetch_blobs(remote, missing_blobs)
+
+ self.set_ref(ref, tree)
+
+ return True
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise CASCacheError("Failed to pull ref {}: {}".format(ref, e)) from e
+ else:
+ return False
+ except BlobNotFound as e:
+ return False
+
+ # pull_tree():
+ #
+ # Pull a single Tree rather than a ref.
+ # Does not update local refs.
+ #
+ # Args:
+ # remote (CASRemote): The remote to pull from
+ # digest (Digest): The digest of the tree
+ #
+ def pull_tree(self, remote, digest):
+ try:
+ remote.init()
+
+ digest = self._fetch_tree(remote, digest)
+
+ return digest
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise
+
+ return None
+
+ # link_ref():
+ #
+ # Add an alias for an existing ref.
+ #
+ # Args:
+ # oldref (str): An existing ref
+ # newref (str): A new ref for the same directory
+ #
+ def link_ref(self, oldref, newref):
+ tree = self.resolve_ref(oldref)
+
+ self.set_ref(newref, tree)
+
+ # push():
+ #
+ # Push committed refs to remote repository.
+ #
+ # Args:
+ # refs (list): The refs to push
+ # remote (CASRemote): The remote to push to
+ #
+ # Returns:
+ # (bool): True if any remote was updated, False if no pushes were required
+ #
+ # Raises:
+ # (CASCacheError): if there was an error
+ #
+ def push(self, refs, remote):
+ skipped_remote = True
+ try:
+ for ref in refs:
+ tree = self.resolve_ref(ref)
+
+ # Check whether ref is already on the server in which case
+ # there is no need to push the ref
+ try:
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
+ request.key = ref
+ response = remote.ref_storage.GetReference(request)
+
+ if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
+ # ref is already on the server with the same tree
+ continue
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ # Intentionally re-raise RpcError for outer except block.
+ raise
+
+ self._send_directory(remote, tree)
+
+ request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
+ request.keys.append(ref)
+ request.digest.CopyFrom(tree)
+ remote.ref_storage.UpdateReference(request)
+
+ skipped_remote = False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
+
+ return not skipped_remote
+
+ # objpath():
+ #
+ # Return the path of an object based on its digest.
+ #
+ # Args:
+ # digest (Digest): The digest of the object
+ #
+ # Returns:
+ # (str): The path of the object
+ #
+ def objpath(self, digest):
+ return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:])
+
+ # add_object():
+ #
+ # Hash and write object to CAS.
+ #
+ # Args:
+ # digest (Digest): An optional Digest object to populate
+ # path (str): Path to file to add
+ # buffer (bytes): Byte buffer to add
+ # link_directly (bool): Whether file given by path can be linked
+ #
+ # Returns:
+ # (Digest): The digest of the added object
+ #
+ # Either `path` or `buffer` must be passed, but not both.
+ #
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
+ # Exactly one of the two parameters has to be specified
+ assert (path is None) != (buffer is None)
+
+ if digest is None:
+ digest = remote_execution_pb2.Digest()
+
+ try:
+ h = hashlib.sha256()
+ # Always write out new file to avoid corruption if input file is modified
+ with contextlib.ExitStack() as stack:
+ if path is not None and link_directly:
+ tmp = stack.enter_context(open(path, 'rb'))
+ for chunk in iter(lambda: tmp.read(_BUFFER_SIZE), b""):
+ h.update(chunk)
+ else:
+ tmp = stack.enter_context(self._temporary_object())
+
+ if path:
+ with open(path, 'rb') as f:
+ for chunk in iter(lambda: f.read(_BUFFER_SIZE), b""):
+ h.update(chunk)
+ tmp.write(chunk)
+ else:
+ h.update(buffer)
+ tmp.write(buffer)
+
+ tmp.flush()
+
+ digest.hash = h.hexdigest()
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
+
+ # Place file at final location
+ objpath = self.objpath(digest)
+ os.makedirs(os.path.dirname(objpath), exist_ok=True)
+ os.link(tmp.name, objpath)
+
+ except FileExistsError as e:
+ # We can ignore the failed link() if the object is already in the repo.
+ pass
+
+ except OSError as e:
+ raise CASCacheError("Failed to hash object: {}".format(e)) from e
+
+ return digest
+
+ # set_ref():
+ #
+ # Create or replace a ref.
+ #
+ # Args:
+ # ref (str): The name of the ref
+ #
+ def set_ref(self, ref, tree):
+ refpath = self._refpath(ref)
+ os.makedirs(os.path.dirname(refpath), exist_ok=True)
+ with utils.save_file_atomic(refpath, 'wb', tempdir=self.tmpdir) as f:
+ f.write(tree.SerializeToString())
+
+ # resolve_ref():
+ #
+ # Resolve a ref to a digest.
+ #
+ # Args:
+ # ref (str): The name of the ref
+ # update_mtime (bool): Whether to update the mtime of the ref
+ #
+ # Returns:
+ # (Digest): The digest stored in the ref
+ #
+ def resolve_ref(self, ref, *, update_mtime=False):
+ refpath = self._refpath(ref)
+
+ try:
+ with open(refpath, 'rb') as f:
+ if update_mtime:
+ os.utime(refpath)
+
+ digest = remote_execution_pb2.Digest()
+ digest.ParseFromString(f.read())
+ return digest
+
+ except FileNotFoundError as e:
+ raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
+
+ # update_mtime()
+ #
+ # Update the mtime of a ref.
+ #
+ # Args:
+ # ref (str): The ref to update
+ #
+ def update_mtime(self, ref):
+ try:
+ os.utime(self._refpath(ref))
+ except FileNotFoundError as e:
+ raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
+
+ # list_objects():
+ #
+ # List cached objects in Least Recently Modified (LRM) order.
+ #
+ # Returns:
+ # (list) - A list of objects and timestamps in LRM order
+ #
+ def list_objects(self):
+ objs = []
+ mtimes = []
+
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
+ for filename in files:
+ obj_path = os.path.join(root, filename)
+ try:
+ mtimes.append(os.path.getmtime(obj_path))
+ except FileNotFoundError:
+ pass
+ else:
+ objs.append(obj_path)
+
+ # NOTE: Sorted will sort from earliest to latest, thus the
+ # first element of this list will be the file modified earliest.
+ return sorted(zip(mtimes, objs))
+
+ def clean_up_refs_until(self, time):
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
+
+ for root, _, files in os.walk(ref_heads):
+ for filename in files:
+ ref_path = os.path.join(root, filename)
+ # Obtain the mtime (the time a file was last modified)
+ if os.path.getmtime(ref_path) < time:
+ os.unlink(ref_path)
+
+ # remove():
+ #
+ # Removes the given symbolic ref from the repo.
+ #
+ # Args:
+ # ref (str): A symbolic ref
+ # basedir (str): Path of base directory the ref is in, defaults to
+ # CAS refs heads
+ # defer_prune (bool): Whether to defer pruning to the caller. NOTE:
+ # The space won't be freed until you manually
+ # call prune.
+ #
+ # Returns:
+ # (int|None) The amount of space pruned from the repository in
+ # Bytes, or None if defer_prune is True
+ #
+ def remove(self, ref, *, basedir=None, defer_prune=False):
+
+ if basedir is None:
+ basedir = os.path.join(self.casdir, 'refs', 'heads')
+ # Remove cache ref
+ self._remove_ref(ref, basedir)
+
+ if not defer_prune:
+ pruned = self.prune()
+ return pruned
+
+ return None
+
+ # adds callback of iterator over reachable directory digests
+ def add_reachable_directories_callback(self, callback):
+ self.__reachable_directory_callbacks.append(callback)
+
+ # adds callbacks of iterator over reachable file digests
+ def add_reachable_digests_callback(self, callback):
+ self.__reachable_digest_callbacks.append(callback)
+
+ # prune():
+ #
+ # Prune unreachable objects from the repo.
+ #
+ def prune(self):
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
+
+ pruned = 0
+ reachable = set()
+
+ # Check which objects are reachable
+ for root, _, files in os.walk(ref_heads):
+ for filename in files:
+ ref_path = os.path.join(root, filename)
+ ref = os.path.relpath(ref_path, ref_heads)
+
+ tree = self.resolve_ref(ref)
+ self._reachable_refs_dir(reachable, tree)
+
+ # check callback directory digests that are reachable
+ for digest_callback in self.__reachable_directory_callbacks:
+ for digest in digest_callback():
+ self._reachable_refs_dir(reachable, digest)
+
+ # check callback file digests that are reachable
+ for digest_callback in self.__reachable_digest_callbacks:
+ for digest in digest_callback():
+ reachable.add(digest.hash)
+
+ # Prune unreachable objects
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
+ for filename in files:
+ objhash = os.path.basename(root) + filename
+ if objhash not in reachable:
+ obj_path = os.path.join(root, filename)
+ pruned += os.stat(obj_path).st_size
+ os.unlink(obj_path)
+
+ return pruned
+
+ def update_tree_mtime(self, tree):
+ reachable = set()
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
+
+ # remote_missing_blobs_for_directory():
+ #
+ # Determine which blobs of a directory tree are missing on the remote.
+ #
+ # Args:
+ # digest (Digest): The directory digest
+ #
+ # Returns: List of missing Digest objects
+ #
+ def remote_missing_blobs_for_directory(self, remote, digest):
+ required_blobs = self.required_blobs_for_directory(digest)
+
+ return self.remote_missing_blobs(remote, required_blobs)
+
+ # remote_missing_blobs():
+ #
+ # Determine which blobs are missing on the remote.
+ #
+ # Args:
+ # blobs (Digest): The directory digest
+ #
+ # Returns: List of missing Digest objects
+ #
+ def remote_missing_blobs(self, remote, blobs):
+ missing_blobs = dict()
+ # Limit size of FindMissingBlobs request
+ for required_blobs_group in _grouper(blobs, 512):
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
+
+ for required_digest in required_blobs_group:
+ d = request.blob_digests.add()
+ d.CopyFrom(required_digest)
+
+ response = remote.cas.FindMissingBlobs(request)
+ for missing_digest in response.missing_blob_digests:
+ d = remote_execution_pb2.Digest()
+ d.CopyFrom(missing_digest)
+ missing_blobs[d.hash] = d
+
+ return missing_blobs.values()
+
+ # local_missing_blobs():
+ #
+ # Check local cache for missing blobs.
+ #
+ # Args:
+ # digests (list): The Digests of blobs to check
+ #
+ # Returns: Missing Digest objects
+ #
+ def local_missing_blobs(self, digests):
+ missing_blobs = []
+ for digest in digests:
+ objpath = self.objpath(digest)
+ if not os.path.exists(objpath):
+ missing_blobs.append(digest)
+ return missing_blobs
+
+ # required_blobs_for_directory():
+ #
+ # Generator that returns the Digests of all blobs in the tree specified by
+ # the Digest of the toplevel Directory object.
+ #
+ def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=None):
+ if not excluded_subdirs:
+ excluded_subdirs = []
+
+ # parse directory, and recursively add blobs
+
+ yield directory_digest
+
+ directory = remote_execution_pb2.Directory()
+
+ with open(self.objpath(directory_digest), 'rb') as f:
+ directory.ParseFromString(f.read())
+
+ for filenode in directory.files:
+ yield filenode.digest
+
+ for dirnode in directory.directories:
+ if dirnode.name not in excluded_subdirs:
+ yield from self.required_blobs_for_directory(dirnode.digest)
+
+ def diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
+ dir_a = remote_execution_pb2.Directory()
+ dir_b = remote_execution_pb2.Directory()
+
+ if tree_a:
+ with open(self.objpath(tree_a), 'rb') as f:
+ dir_a.ParseFromString(f.read())
+ if tree_b:
+ with open(self.objpath(tree_b), 'rb') as f:
+ dir_b.ParseFromString(f.read())
+
+ a = 0
+ b = 0
+ while a < len(dir_a.files) or b < len(dir_b.files):
+ if b < len(dir_b.files) and (a >= len(dir_a.files) or
+ dir_a.files[a].name > dir_b.files[b].name):
+ added.append(os.path.join(path, dir_b.files[b].name))
+ b += 1
+ elif a < len(dir_a.files) and (b >= len(dir_b.files) or
+ dir_b.files[b].name > dir_a.files[a].name):
+ removed.append(os.path.join(path, dir_a.files[a].name))
+ a += 1
+ else:
+ # File exists in both directories
+ if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash:
+ modified.append(os.path.join(path, dir_a.files[a].name))
+ a += 1
+ b += 1
+
+ a = 0
+ b = 0
+ while a < len(dir_a.directories) or b < len(dir_b.directories):
+ if b < len(dir_b.directories) and (a >= len(dir_a.directories) or
+ dir_a.directories[a].name > dir_b.directories[b].name):
+ self.diff_trees(None, dir_b.directories[b].digest,
+ added=added, removed=removed, modified=modified,
+ path=os.path.join(path, dir_b.directories[b].name))
+ b += 1
+ elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or
+ dir_b.directories[b].name > dir_a.directories[a].name):
+ self.diff_trees(dir_a.directories[a].digest, None,
+ added=added, removed=removed, modified=modified,
+ path=os.path.join(path, dir_a.directories[a].name))
+ a += 1
+ else:
+ # Subdirectory exists in both directories
+ if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash:
+ self.diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest,
+ added=added, removed=removed, modified=modified,
+ path=os.path.join(path, dir_a.directories[a].name))
+ a += 1
+ b += 1
+
+ ################################################
+ # Local Private Methods #
+ ################################################
+
+ def _refpath(self, ref):
+ return os.path.join(self.casdir, 'refs', 'heads', ref)
+
+ # _remove_ref()
+ #
+ # Removes a ref.
+ #
+ # This also takes care of pruning away directories which can
+ # be removed after having removed the given ref.
+ #
+ # Args:
+ # ref (str): The ref to remove
+ # basedir (str): Path of base directory the ref is in
+ #
+ # Raises:
+ # (CASCacheError): If the ref didnt exist, or a system error
+ # occurred while removing it
+ #
+ def _remove_ref(self, ref, basedir):
+
+ # Remove the ref itself
+ refpath = os.path.join(basedir, ref)
+
+ try:
+ os.unlink(refpath)
+ except FileNotFoundError as e:
+ raise CASCacheError("Could not find ref '{}'".format(ref)) from e
+
+ # Now remove any leading directories
+
+ components = list(os.path.split(ref))
+ while components:
+ components.pop()
+ refdir = os.path.join(basedir, *components)
+
+ # Break out once we reach the base
+ if refdir == basedir:
+ break
+
+ try:
+ os.rmdir(refdir)
+ except FileNotFoundError:
+ # The parent directory did not exist, but it's
+ # parent directory might still be ready to prune
+ pass
+ except OSError as e:
+ if e.errno == errno.ENOTEMPTY:
+ # The parent directory was not empty, so we
+ # cannot prune directories beyond this point
+ break
+
+ # Something went wrong here
+ raise CASCacheError("System error while removing ref '{}': {}".format(ref, e)) from e
+
+ # _commit_directory():
+ #
+ # Adds local directory to content addressable store.
+ #
+ # Adds files, symbolic links and recursively other directories in
+ # a local directory to the content addressable store.
+ #
+ # Args:
+ # path (str): Path to the directory to add.
+ # dir_digest (Digest): An optional Digest object to use.
+ #
+ # Returns:
+ # (Digest): Digest object for the directory added.
+ #
+ def _commit_directory(self, path, *, dir_digest=None):
+ directory = remote_execution_pb2.Directory()
+
+ for name in sorted(os.listdir(path)):
+ full_path = os.path.join(path, name)
+ mode = os.lstat(full_path).st_mode
+ if stat.S_ISDIR(mode):
+ dirnode = directory.directories.add()
+ dirnode.name = name
+ self._commit_directory(full_path, dir_digest=dirnode.digest)
+ elif stat.S_ISREG(mode):
+ filenode = directory.files.add()
+ filenode.name = name
+ self.add_object(path=full_path, digest=filenode.digest)
+ filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
+ elif stat.S_ISLNK(mode):
+ symlinknode = directory.symlinks.add()
+ symlinknode.name = name
+ symlinknode.target = os.readlink(full_path)
+ elif stat.S_ISSOCK(mode):
+ # The process serving the socket can't be cached anyway
+ pass
+ else:
+ raise CASCacheError("Unsupported file type for {}".format(full_path))
+
+ return self.add_object(digest=dir_digest,
+ buffer=directory.SerializeToString())
+
+ def _get_subdir(self, tree, subdir):
+ head, name = os.path.split(subdir)
+ if head:
+ tree = self._get_subdir(tree, head)
+
+ directory = remote_execution_pb2.Directory()
+
+ with open(self.objpath(tree), 'rb') as f:
+ directory.ParseFromString(f.read())
+
+ for dirnode in directory.directories:
+ if dirnode.name == name:
+ return dirnode.digest
+
+ raise CASCacheError("Subdirectory {} not found".format(name))
+
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False):
+ if tree.hash in reachable:
+ return
+ try:
+ if update_mtime:
+ os.utime(self.objpath(tree))
+
+ reachable.add(tree.hash)
+
+ directory = remote_execution_pb2.Directory()
+
+ with open(self.objpath(tree), 'rb') as f:
+ directory.ParseFromString(f.read())
+
+ except FileNotFoundError:
+ # Just exit early if the file doesn't exist
+ return
+
+ for filenode in directory.files:
+ if update_mtime:
+ os.utime(self.objpath(filenode.digest))
+ if check_exists:
+ if not os.path.exists(self.objpath(filenode.digest)):
+ raise FileNotFoundError
+ reachable.add(filenode.digest.hash)
+
+ for dirnode in directory.directories:
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime, check_exists=check_exists)
+
+ # _temporary_object():
+ #
+ # Returns:
+ # (file): A file object to a named temporary file.
+ #
+ # Create a named temporary file with 0o0644 access rights.
+ @contextlib.contextmanager
+ def _temporary_object(self):
+ with utils._tempnamedfile(dir=self.tmpdir) as f:
+ os.chmod(f.name,
+ stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
+ yield f
+
+ # _ensure_blob():
+ #
+ # Fetch and add blob if it's not already local.
+ #
+ # Args:
+ # remote (Remote): The remote to use.
+ # digest (Digest): Digest object for the blob to fetch.
+ #
+ # Returns:
+ # (str): The path of the object
+ #
+ def _ensure_blob(self, remote, digest):
+ objpath = self.objpath(digest)
+ if os.path.exists(objpath):
+ # already in local repository
+ return objpath
+
+ with self._temporary_object() as f:
+ remote._fetch_blob(digest, f)
+
+ added_digest = self.add_object(path=f.name, link_directly=True)
+ assert added_digest.hash == digest.hash
+
+ return objpath
+
+ def _batch_download_complete(self, batch, *, missing_blobs=None):
+ for digest, data in batch.send(missing_blobs=missing_blobs):
+ with self._temporary_object() as f:
+ f.write(data)
+ f.flush()
+
+ added_digest = self.add_object(path=f.name, link_directly=True)
+ assert added_digest.hash == digest.hash
+
+ # Helper function for _fetch_directory().
+ def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
+ self._batch_download_complete(batch)
+
+ # All previously scheduled directories are now locally available,
+ # move them to the processing queue.
+ fetch_queue.extend(fetch_next_queue)
+ fetch_next_queue.clear()
+ return _CASBatchRead(remote)
+
+ # Helper function for _fetch_directory().
+ def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
+ in_local_cache = os.path.exists(self.objpath(digest))
+
+ if in_local_cache:
+ # Skip download, already in local cache.
+ pass
+ elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
+ not remote.batch_read_supported):
+ # Too large for batch request, download in independent request.
+ self._ensure_blob(remote, digest)
+ in_local_cache = True
+ else:
+ if not batch.add(digest):
+ # Not enough space left in batch request.
+ # Complete pending batch first.
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
+ batch.add(digest)
+
+ if recursive:
+ if in_local_cache:
+ # Add directory to processing queue.
+ fetch_queue.append(digest)
+ else:
+ # Directory will be available after completing pending batch.
+ # Add directory to deferred processing queue.
+ fetch_next_queue.append(digest)
+
+ return batch
+
+ # _fetch_directory():
+ #
+ # Fetches remote directory and adds it to content addressable store.
+ #
+ # This recursively fetches directory objects but doesn't fetch any
+ # files.
+ #
+ # Args:
+ # remote (Remote): The remote to use.
+ # dir_digest (Digest): Digest object for the directory to fetch.
+ #
+ def _fetch_directory(self, remote, dir_digest):
+ # TODO Use GetTree() if the server supports it
+
+ fetch_queue = [dir_digest]
+ fetch_next_queue = []
+ batch = _CASBatchRead(remote)
+
+ while len(fetch_queue) + len(fetch_next_queue) > 0:
+ if not fetch_queue:
+ batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
+
+ dir_digest = fetch_queue.pop(0)
+
+ objpath = self._ensure_blob(remote, dir_digest)
+
+ directory = remote_execution_pb2.Directory()
+ with open(objpath, 'rb') as f:
+ directory.ParseFromString(f.read())
+
+ for dirnode in directory.directories:
+ batch = self._fetch_directory_node(remote, dirnode.digest, batch,
+ fetch_queue, fetch_next_queue, recursive=True)
+
+ # Fetch final batch
+ self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
+
+ def _fetch_tree(self, remote, digest):
+ # download but do not store the Tree object
+ with utils._tempnamedfile(dir=self.tmpdir) as out:
+ remote._fetch_blob(digest, out)
+
+ tree = remote_execution_pb2.Tree()
+
+ with open(out.name, 'rb') as f:
+ tree.ParseFromString(f.read())
+
+ tree.children.extend([tree.root])
+ for directory in tree.children:
+ dirbuffer = directory.SerializeToString()
+ dirdigest = self.add_object(buffer=dirbuffer)
+ assert dirdigest.size_bytes == len(dirbuffer)
+
+ return dirdigest
+
+ # fetch_blobs():
+ #
+ # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched.
+ #
+ # Args:
+ # remote (CASRemote): The remote repository to fetch from
+ # digests (list): The Digests of blobs to fetch
+ #
+ # Returns: The Digests of the blobs that were not available on the remote CAS
+ #
+ def fetch_blobs(self, remote, digests):
+ missing_blobs = []
+
+ batch = _CASBatchRead(remote)
+
+ for digest in digests:
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
+ not remote.batch_read_supported):
+ # Too large for batch request, download in independent request.
+ try:
+ self._ensure_blob(remote, digest)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ missing_blobs.append(digest)
+ else:
+ raise CASCacheError("Failed to fetch blob: {}".format(e)) from e
+ else:
+ if not batch.add(digest):
+ # Not enough space left in batch request.
+ # Complete pending batch first.
+ self._batch_download_complete(batch, missing_blobs=missing_blobs)
+
+ batch = _CASBatchRead(remote)
+ batch.add(digest)
+
+ # Complete last pending batch
+ self._batch_download_complete(batch, missing_blobs=missing_blobs)
+
+ return missing_blobs
+
+ # send_blobs():
+ #
+ # Upload blobs to remote CAS.
+ #
+ # Args:
+ # remote (CASRemote): The remote repository to upload to
+ # digests (list): The Digests of Blobs to upload
+ #
+ def send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
+ batch = _CASBatchUpdate(remote)
+
+ for digest in digests:
+ with open(self.objpath(digest), 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
+
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
+ not remote.batch_update_supported):
+ # Too large for batch request, upload in independent request.
+ remote._send_blob(digest, f, u_uid=u_uid)
+ else:
+ if not batch.add(digest, f):
+ # Not enough space left in batch request.
+ # Complete pending batch first.
+ batch.send()
+ batch = _CASBatchUpdate(remote)
+ batch.add(digest, f)
+
+ # Send final batch
+ batch.send()
+
+ def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
+ missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)
+
+ # Upload any blobs missing on the server
+ self.send_blobs(remote, missing_blobs, u_uid)
+
+
+class CASQuota:
+ def __init__(self, context):
+ self.context = context
+ self.cas = context.get_cascache()
+ self.casdir = self.cas.casdir
+ self._config_cache_quota = context.config_cache_quota
+ self._config_cache_quota_string = context.config_cache_quota_string
+ self._cache_size = None # The current cache size, sometimes it's an estimate
+ self._cache_quota = None # The cache quota
+ self._cache_quota_original = None # The cache quota as specified by the user, in bytes
+ self._cache_quota_headroom = None # The headroom in bytes before reaching the quota or full disk
+ self._cache_lower_threshold = None # The target cache size for a cleanup
+ self.available_space = None
+
+ self._message = context.message
+
+ self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method
+ self._list_refs_callbacks = [] # Callbacks to all refs
+
+ self._calculate_cache_quota()
+
+ # compute_cache_size()
+ #
+ # Computes the real artifact cache size.
+ #
+ # Returns:
+ # (int): The size of the artifact cache.
+ #
+ def compute_cache_size(self):
+ self._cache_size = utils._get_dir_size(self.casdir)
+ return self._cache_size
+
+ # get_cache_size()
+ #
+ # Fetches the cached size of the cache, this is sometimes
+ # an estimate and periodically adjusted to the real size
+ # when a cache size calculation job runs.
+ #
+ # When it is an estimate, the value is either correct, or
+ # it is greater than the actual cache size.
+ #
+ # Returns:
+ # (int) An approximation of the artifact cache size, in bytes.
+ #
+ def get_cache_size(self):
+
+ # If we don't currently have an estimate, figure out the real cache size.
+ if self._cache_size is None:
+ stored_size = self._read_cache_size()
+ if stored_size is not None:
+ self._cache_size = stored_size
+ else:
+ self.compute_cache_size()
+
+ return self._cache_size
+
+ # set_cache_size()
+ #
+ # Forcefully set the overall cache size.
+ #
+ # This is used to update the size in the main process after
+ # having calculated in a cleanup or a cache size calculation job.
+ #
+ # Args:
+ # cache_size (int): The size to set.
+ # write_to_disk (bool): Whether to write the value to disk.
+ #
+ def set_cache_size(self, cache_size, *, write_to_disk=True):
+
+ assert cache_size is not None
+
+ self._cache_size = cache_size
+ if write_to_disk:
+ self._write_cache_size(self._cache_size)
+
+ # full()
+ #
+ # Checks if the artifact cache is full, either
+ # because the user configured quota has been exceeded
+ # or because the underlying disk is almost full.
+ #
+ # Returns:
+ # (bool): True if the artifact cache is full
+ #
+ def full(self):
+
+ if self.get_cache_size() > self._cache_quota:
+ return True
+
+ _, volume_avail = self._get_cache_volume_size()
+ if volume_avail < self._cache_quota_headroom:
+ return True
+
+ return False
+
+ # add_remove_callbacks()
+ #
+ # This adds tuples of iterators over unrequired objects (currently
+ # artifacts and source refs), and a callback to remove them.
+ #
+ # Args:
+ # callback (iter(unrequired), remove): tuple of iterator and remove
+ # method associated.
+ #
+ def add_remove_callbacks(self, list_unrequired, remove_method):
+ self._remove_callbacks.append((list_unrequired, remove_method))
+
+ def add_list_refs_callback(self, list_callback):
+ self._list_refs_callbacks.append(list_callback)
+
+ ################################################
+ # Local Private Methods #
+ ################################################
+
+ # _read_cache_size()
+ #
+ # Reads and returns the size of the artifact cache that's stored in the
+ # cache's size file
+ #
+ # Returns:
+ # (int): The size of the artifact cache, as recorded in the file
+ #
+ def _read_cache_size(self):
+ size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
+
+ if not os.path.exists(size_file_path):
+ return None
+
+ with open(size_file_path, "r") as f:
+ size = f.read()
+
+ try:
+ num_size = int(size)
+ except ValueError as e:
+ raise CASCacheError("Size '{}' parsed from '{}' was not an integer".format(
+ size, size_file_path)) from e
+
+ return num_size
+
+ # _write_cache_size()
+ #
+ # Writes the given size of the artifact to the cache's size file
+ #
+ # Args:
+ # size (int): The size of the artifact cache to record
+ #
+ def _write_cache_size(self, size):
+ assert isinstance(size, int)
+ size_file_path = os.path.join(self.casdir, CACHE_SIZE_FILE)
+ with utils.save_file_atomic(size_file_path, "w", tempdir=self.cas.tmpdir) as f:
+ f.write(str(size))
+
+ # _get_cache_volume_size()
+ #
+ # Get the available space and total space for the volume on
+ # which the artifact cache is located.
+ #
+ # Returns:
+ # (int): The total number of bytes on the volume
+ # (int): The number of available bytes on the volume
+ #
+ # NOTE: We use this stub to allow the test cases
+ # to override what an artifact cache thinks
+ # about it's disk size and available bytes.
+ #
+ def _get_cache_volume_size(self):
+ return utils._get_volume_size(self.casdir)
+
+ # _calculate_cache_quota()
+ #
+ # Calculates and sets the cache quota and lower threshold based on the
+ # quota set in Context.
+ # It checks that the quota is both a valid expression, and that there is
+ # enough disk space to satisfy that quota
+ #
+ def _calculate_cache_quota(self):
+ # Headroom intended to give BuildStream a bit of leeway.
+ # This acts as the minimum size of cache_quota and also
+ # is taken from the user requested cache_quota.
+ #
+ if 'BST_TEST_SUITE' in os.environ:
+ self._cache_quota_headroom = 0
+ else:
+ self._cache_quota_headroom = 2e9
+
+ total_size, available_space = self._get_cache_volume_size()
+ cache_size = self.get_cache_size()
+ self.available_space = available_space
+
+ # Ensure system has enough storage for the cache_quota
+ #
+ # If cache_quota is none, set it to the maximum it could possibly be.
+ #
+ # Also check that cache_quota is at least as large as our headroom.
+ #
+ cache_quota = self._config_cache_quota
+ if cache_quota is None:
+ # The user has set no limit, so we may take all the space.
+ cache_quota = min(cache_size + available_space, total_size)
+ if cache_quota < self._cache_quota_headroom: # Check minimum
+ raise LoadError(
+ LoadErrorReason.INVALID_DATA,
+ "Invalid cache quota ({}): BuildStream requires a minimum cache quota of {}.".format(
+ utils._pretty_size(cache_quota),
+ utils._pretty_size(self._cache_quota_headroom)))
+ elif cache_quota > total_size:
+ # A quota greater than the total disk size is certianly an error
+ raise CASCacheError("Your system does not have enough available " +
+ "space to support the cache quota specified.",
+ detail=("You have specified a quota of {quota} total disk space.\n" +
+ "The filesystem containing {local_cache_path} only " +
+ "has {total_size} total disk space.")
+ .format(
+ quota=self._config_cache_quota,
+ local_cache_path=self.casdir,
+ total_size=utils._pretty_size(total_size)),
+ reason='insufficient-storage-for-quota')
+
+ elif cache_quota > cache_size + available_space:
+ # The quota does not fit in the available space, this is a warning
+ if '%' in self._config_cache_quota_string:
+ available = (available_space / total_size) * 100
+ available = '{}% of total disk space'.format(round(available, 1))
+ else:
+ available = utils._pretty_size(available_space)
+
+ self._message(Message(
+ None,
+ MessageType.WARN,
+ "Your system does not have enough available " +
+ "space to support the cache quota specified.",
+ detail=("You have specified a quota of {quota} total disk space.\n" +
+ "The filesystem containing {local_cache_path} only " +
+ "has {available_size} available.")
+ .format(quota=self._config_cache_quota,
+ local_cache_path=self.casdir,
+ available_size=available)))
+
+ # Place a slight headroom (2e9 (2GB) on the cache_quota) into
+ # cache_quota to try and avoid exceptions.
+ #
+ # Of course, we might still end up running out during a build
+ # if we end up writing more than 2G, but hey, this stuff is
+ # already really fuzzy.
+ #
+ self._cache_quota_original = cache_quota
+ self._cache_quota = cache_quota - self._cache_quota_headroom
+ self._cache_lower_threshold = self._cache_quota / 2
+
+ # clean():
+ #
+ # Clean the artifact cache as much as possible.
+ #
+ # Args:
+ # progress (callable): A callback to call when a ref is removed
+ #
+ # Returns:
+ # (int): The size of the cache after having cleaned up
+ #
+ def clean(self, progress=None):
+ context = self.context
+
+ # Some accumulative statistics
+ removed_ref_count = 0
+ space_saved = 0
+
+ total_refs = 0
+ for refs in self._list_refs_callbacks:
+ total_refs += len(list(refs()))
+
+ # Start off with an announcement with as much info as possible
+ volume_size, volume_avail = self._get_cache_volume_size()
+ self._message(Message(
+ None, MessageType.STATUS, "Starting cache cleanup",
+ detail=("Elements required by the current build plan:\n" + "{}\n" +
+ "User specified quota: {} ({})\n" +
+ "Cache usage: {}\n" +
+ "Cache volume: {} total, {} available")
+ .format(
+ total_refs,
+ context.config_cache_quota,
+ utils._pretty_size(self._cache_quota, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ utils._pretty_size(volume_size, dec_places=2),
+ utils._pretty_size(volume_avail, dec_places=2))))
+
+ # Do a real computation of the cache size once, just in case
+ self.compute_cache_size()
+ usage = CASCacheUsage(self)
+ self._message(Message(None, MessageType.STATUS,
+ "Cache usage recomputed: {}".format(usage)))
+
+ # Collect digests and their remove method
+ all_unrequired_refs = []
+ for (unrequired_refs, remove) in self._remove_callbacks:
+ for (mtime, ref) in unrequired_refs():
+ all_unrequired_refs.append((mtime, ref, remove))
+
+ # Pair refs and their remove method sorted in time order
+ all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)]
+
+ # Go through unrequired refs and remove them, oldest first
+ made_space = False
+ for (ref, remove) in all_unrequired_refs:
+ size = remove(ref)
+ removed_ref_count += 1
+ space_saved += size
+
+ self._message(Message(
+ None, MessageType.STATUS,
+ "Freed {: <7} {}".format(
+ utils._pretty_size(size, dec_places=2),
+ ref)))
+
+ self.set_cache_size(self._cache_size - size)
+
+ # User callback
+ #
+ # Currently this process is fairly slow, but we should
+ # think about throttling this progress() callback if this
+ # becomes too intense.
+ if progress:
+ progress()
+
+ if self.get_cache_size() < self._cache_lower_threshold:
+ made_space = True
+ break
+
+ if not made_space and self.full():
+ # If too many artifacts are required, and we therefore
+ # can't remove them, we have to abort the build.
+ #
+ # FIXME: Asking the user what to do may be neater
+ #
+ default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
+ 'buildstream.conf')
+ detail = ("Aborted after removing {} refs and saving {} disk space.\n"
+ "The remaining {} in the cache is required by the {} references in your build plan\n\n"
+ "There is not enough space to complete the build.\n"
+ "Please increase the cache-quota in {} and/or make more disk space."
+ .format(removed_ref_count,
+ utils._pretty_size(space_saved, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ total_refs,
+ (context.config_origin or default_conf)))
+
+ raise CASCacheError("Cache too full. Aborting.",
+ detail=detail,
+ reason="cache-too-full")
+
+ # Informational message about the side effects of the cleanup
+ self._message(Message(
+ None, MessageType.INFO, "Cleanup completed",
+ detail=("Removed {} refs and saving {} disk space.\n" +
+ "Cache usage is now: {}")
+ .format(removed_ref_count,
+ utils._pretty_size(space_saved, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2))))
+
+ return self.get_cache_size()
+
+
+def _grouper(iterable, n):
+ while True:
+ try:
+ current = next(iterable)
+ except StopIteration:
+ return
+ yield itertools.chain([current], itertools.islice(iterable, n - 1))