diff options
author | Valentin David <valentin.david@codethink.co.uk> | 2018-11-16 15:25:12 +0100 |
---|---|---|
committer | Valentin David <valentin.david@codethink.co.uk> | 2018-11-28 15:29:52 +0100 |
commit | 353b90dda760f320ec5b97c0bb56dce2ed7ea68f (patch) | |
tree | 48d34cce0a7da082cb1aa1ca9061413046a15503 | |
parent | a64f667db8cc8123b7a77c9871143fbe8d008aaf (diff) | |
download | buildstream-353b90dda760f320ec5b97c0bb56dce2ed7ea68f.tar.gz |
Cleanup cache in cas server more agressively
When there is less than 2GB left, it cleans up have 10GB available.
These values are configurable.
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 149 | ||||
-rw-r--r-- | tests/frontend/push.py | 4 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 21 |
3 files changed, 107 insertions, 67 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 3a6481fb2..84d22cc51 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -57,18 +57,22 @@ class ArtifactTooLargeException(Exception): # repo (str): Path to CAS repository # enable_push (bool): Whether to allow blob uploads and artifact updates # -def create_server(repo, *, enable_push): +def create_server(repo, *, enable_push, + max_head_size=int(10e9), + min_head_size=int(2e9)): cas = CASCache(os.path.abspath(repo)) # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) + cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size) + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(cas, enable_push=enable_push), server) + _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -86,9 +90,19 @@ def create_server(repo, *, enable_push): @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)") @click.option('--enable-push', default=False, is_flag=True, help="Allow clients to upload blobs and update artifact cache") +@click.option('--head-room-min', type=click.INT, + help="Disk head room minimum in bytes", + default=2e9) +@click.option('--head-room-max', type=click.INT, + help="Disk head room maximum in bytes", + default=10e9) @click.argument('repo') -def server_main(repo, port, server_key, server_cert, client_certs, enable_push): - server = create_server(repo, enable_push=enable_push) +def server_main(repo, port, server_key, server_cert, client_certs, enable_push, + head_room_min, head_room_max): + server = create_server(repo, + max_head_size=head_room_max, + min_head_size=head_room_min, + enable_push=enable_push) use_tls = bool(server_key) @@ -130,10 +144,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): - def __init__(self, cas, *, enable_push): + def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push + self.cache_cleaner = cache_cleaner def Read(self, request, context): resource_name = request.resource_name @@ -195,7 +210,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): if client_digest.size_bytes == 0: break try: - _clean_up_cache(self.cas, client_digest.size_bytes) + self.cache_cleaner.clean_up(client_digest.size_bytes) except ArtifactTooLargeException as e: context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) context.set_details(str(e)) @@ -239,10 +254,11 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas, *, enable_push): + def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push + self.cache_cleaner = cache_cleaner def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -311,7 +327,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres continue try: - _clean_up_cache(self.cas, digest.size_bytes) + self.cache_cleaner.clean_up(digest.size_bytes) with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: out.write(blob_request.data) @@ -432,63 +448,70 @@ def _digest_from_upload_resource_name(resource_name): return None -# _clean_up_cache() -# -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there -# is enough space for the incoming artifact -# -# Args: -# cas: CASCache object -# object_size: The size of the object being received in bytes -# -# Returns: -# int: The total bytes removed on the filesystem -# -def _clean_up_cache(cas, object_size): - # Determine the available disk space, in bytes, of the file system - # which mounts the repo - stats = os.statvfs(cas.casdir) - buffer_ = int(2e9) # Add a 2 GB buffer - free_disk_space = (stats.f_bavail * stats.f_bsize) - buffer_ - total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_ - - if object_size > total_disk_space: - raise ArtifactTooLargeException("Artifact of size: {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) - - if object_size <= free_disk_space: - # No need to clean up - return 0 - - # obtain a list of LRP artifacts - LRP_objects = cas.list_objects() - - removed_size = 0 # in bytes - last_mtime = 0 - while object_size - removed_size > free_disk_space: - try: - last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP objects - except IndexError: - # This exception is caught if there are no more artifacts in the list - # LRP_artifacts. This means the the artifact is too large for the filesystem - # so we abort the process - raise ArtifactTooLargeException("Artifact of size {} is too large for " +class _CacheCleaner: + + def __init__(self, cas, max_head_size, min_head_size=int(2e9)): + self.__cas = cas + self.__max_head_size = max_head_size + self.__min_head_size = min_head_size + + # _clean_up_cache() + # + # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there + # is enough space for the incoming artifact + # + # Args: + # object_size: The size of the object being received in bytes + # + # Returns: + # int: The total bytes removed on the filesystem + # + def clean_up(self, object_size): + stats = os.statvfs(self.__cas.casdir) + free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size + total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size + + if object_size > total_disk_space: + raise ArtifactTooLargeException("Artifact of size: {} is too large for " "the filesystem which mounts the remote " "cache".format(object_size)) - try: - size = os.stat(to_remove).st_size - os.unlink(to_remove) - removed_size += size - except FileNotFoundError: - pass + if object_size <= free_disk_space: + # No need to clean up + return 0 - cas.clean_up_refs_until(last_mtime) + stats = os.statvfs(self.__cas.casdir) + target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size - if removed_size > 0: - logging.info("Successfully removed {} bytes from the cache".format(removed_size)) - else: - logging.info("No artifacts were removed from the cache.") + # obtain a list of LRP artifacts + LRP_objects = self.__cas.list_objects() + + removed_size = 0 # in bytes + last_mtime = 0 + + while object_size - removed_size > target_disk_space: + try: + last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact + except IndexError: + # This exception is caught if there are no more artifacts in the list + # LRP_artifacts. This means the the artifact is too large for the filesystem + # so we abort the process + raise ArtifactTooLargeException("Artifact of size {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) + + try: + size = os.stat(to_remove).st_size + os.unlink(to_remove) + removed_size += size + except FileNotFoundError: + pass + + self.__cas.clean_up_refs_until(last_mtime) + + if removed_size > 0: + logging.info("Successfully removed {} bytes from the cache".format(removed_size)) + else: + logging.info("No artifacts were removed from the cache.") - return removed_size + return removed_size diff --git a/tests/frontend/push.py b/tests/frontend/push.py index f2d6814d6..153d43340 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare # Mock a file system with 12 MB free disk space with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'), + min_head_size=int(2e9), + max_head_size=int(2e9), total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share: # Configure bst to push to the cache @@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): # Create an artifact share (remote cache) in tmpdir/artifactshare # Mock a file system with 12 MB free disk space with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'), + min_head_size=int(2e9), + max_head_size=int(2e9), total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share: # Configure bst to push to the cache diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index c1ddc2c46..38c54a947 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution # class ArtifactShare(): - def __init__(self, directory, *, total_space=None, free_space=None): + def __init__(self, directory, *, + total_space=None, + free_space=None, + min_head_size=int(2e9), + max_head_size=int(10e9)): # The working directory for the artifact share (in case it # needs to do something outside of its backend's storage folder). @@ -50,6 +54,9 @@ class ArtifactShare(): self.total_space = total_space self.free_space = free_space + self.max_head_size = max_head_size + self.min_head_size = min_head_size + q = Queue() self.process = Process(target=self.run, args=(q,)) @@ -74,7 +81,10 @@ class ArtifactShare(): self.free_space = self.total_space os.statvfs = self._mock_statvfs - server = create_server(self.repodir, enable_push=True) + server = create_server(self.repodir, + max_head_size=self.max_head_size, + min_head_size=self.min_head_size, + enable_push=True) port = server.add_insecure_port('localhost:0') server.start() @@ -176,8 +186,11 @@ class ArtifactShare(): # Create an ArtifactShare for use in a test case # @contextmanager -def create_artifact_share(directory, *, total_space=None, free_space=None): - share = ArtifactShare(directory, total_space=total_space, free_space=free_space) +def create_artifact_share(directory, *, total_space=None, free_space=None, + min_head_size=int(2e9), + max_head_size=int(10e9)): + share = ArtifactShare(directory, total_space=total_space, free_space=free_space, + min_head_size=min_head_size, max_head_size=max_head_size) try: yield share finally: |