From dbca7c70a8065d2c26a2d447aa0a889cf5624d5a Mon Sep 17 00:00:00 2001 From: Valentin David Date: Fri, 16 Nov 2018 15:25:12 +0100 Subject: Cleanup cache in cas server more agressively When there is less than 2GB left, it cleans up have 10GB available. These values are configurable. --- buildstream/_artifactcache/casserver.py | 149 ++++++++++++++++++-------------- 1 file changed, 86 insertions(+), 63 deletions(-) (limited to 'buildstream') diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 503705d9e..d552aab94 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -57,7 +57,9 @@ class ArtifactTooLargeException(Exception): # repo (str): Path to CAS repository # enable_push (bool): Whether to allow blob uploads and artifact updates # -def create_server(repo, *, enable_push): +def create_server(repo, *, enable_push, + max_head_size=int(10e9), + min_head_size=int(2e9)): context = Context() context.artifactdir = os.path.abspath(repo) @@ -67,11 +69,13 @@ def create_server(repo, *, enable_push): max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) + cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size) + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(artifactcache, enable_push=enable_push), server) + _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) + _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -89,9 +93,19 @@ def create_server(repo, *, enable_push): @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)") @click.option('--enable-push', default=False, is_flag=True, help="Allow clients to upload blobs and update artifact cache") +@click.option('--head-room-min', type=click.INT, + help="Disk head room minimum in bytes", + default=2e9) +@click.option('--head-room-max', type=click.INT, + help="Disk head room maximum in bytes", + default=10e9) @click.argument('repo') -def server_main(repo, port, server_key, server_cert, client_certs, enable_push): - server = create_server(repo, enable_push=enable_push) +def server_main(repo, port, server_key, server_cert, client_certs, enable_push, + head_room_min, head_room_max): + server = create_server(repo, + max_head_size=head_room_max, + min_head_size=head_room_min, + enable_push=enable_push) use_tls = bool(server_key) @@ -133,10 +147,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): - def __init__(self, cas, *, enable_push): + def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push + self.cache_cleaner = cache_cleaner def Read(self, request, context): resource_name = request.resource_name @@ -198,7 +213,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): if client_digest.size_bytes == 0: break try: - _clean_up_cache(self.cas, client_digest.size_bytes) + self.cache_cleaner.clean_up(client_digest.size_bytes) except ArtifactTooLargeException as e: context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) context.set_details(str(e)) @@ -242,10 +257,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas, *, enable_push): + def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push + self.cache_cleaner = cache_cleaner def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -314,7 +330,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres continue try: - _clean_up_cache(self.cas, digest.size_bytes) + self.cache_cleaner.clean_up(digest.size_bytes) with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: out.write(blob_request.data) @@ -435,63 +451,70 @@ def _digest_from_upload_resource_name(resource_name): return None -# _clean_up_cache() -# -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there -# is enough space for the incoming artifact -# -# Args: -# cas: CASCache object -# object_size: The size of the object being received in bytes -# -# Returns: -# int: The total bytes removed on the filesystem -# -def _clean_up_cache(cas, object_size): - # Determine the available disk space, in bytes, of the file system - # which mounts the repo - stats = os.statvfs(cas.casdir) - buffer_ = int(2e9) # Add a 2 GB buffer - free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_ - total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_ - - if object_size > total_disk_space: - raise ArtifactTooLargeException("Artifact of size: {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) - - if object_size <= free_disk_space: - # No need to clean up - return 0 - - # obtain a list of LRP artifacts - LRP_objects = cas.list_objects() - - removed_size = 0 # in bytes - last_mtime = 0 - while object_size - removed_size > free_disk_space: - try: - last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP objects - except IndexError: - # This exception is caught if there are no more artifacts in the list - # LRP_artifacts. This means the the artifact is too large for the filesystem - # so we abort the process - raise ArtifactTooLargeException("Artifact of size {} is too large for " +class _CacheCleaner: + + def __init__(self, cas, max_head_size, min_head_size=int(2e9)): + self.__cas = cas + self.__max_head_size = max_head_size + self.__min_head_size = min_head_size + + # _clean_up_cache() + # + # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there + # is enough space for the incoming artifact + # + # Args: + # object_size: The size of the object being received in bytes + # + # Returns: + # int: The total bytes removed on the filesystem + # + def clean_up(self, object_size): + stats = os.statvfs(self.__cas.casdir) + free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size + total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size + + if object_size > total_disk_space: + raise ArtifactTooLargeException("Artifact of size: {} is too large for " "the filesystem which mounts the remote " "cache".format(object_size)) - try: - size = os.stat(to_remove).st_size - os.unlink(to_remove) - removed_size += size - except FileNotFoundError: - pass + if object_size <= free_disk_space: + # No need to clean up + return 0 - cas.clean_up_refs_until(last_mtime) + stats = os.statvfs(self.__cas.casdir) + target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size - if removed_size > 0: - logging.info("Successfully removed {} bytes from the cache".format(removed_size)) - else: - logging.info("No artifacts were removed from the cache.") + # obtain a list of LRP artifacts + LRP_objects = self.__cas.list_objects() + + removed_size = 0 # in bytes + last_mtime = 0 + + while object_size - removed_size > target_disk_space: + try: + last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact + except IndexError: + # This exception is caught if there are no more artifacts in the list + # LRP_artifacts. This means the the artifact is too large for the filesystem + # so we abort the process + raise ArtifactTooLargeException("Artifact of size {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) + + try: + size = os.stat(to_remove).st_size + os.unlink(to_remove) + removed_size += size + except FileNotFoundError: + pass + + self.__cas.clean_up_refs_until(last_mtime) + + if removed_size > 0: + logging.info("Successfully removed {} bytes from the cache".format(removed_size)) + else: + logging.info("No artifacts were removed from the cache.") - return removed_size + return removed_size -- cgit v1.2.1