diff options
author | Jürg Billeter <j@bitron.ch> | 2019-06-06 15:40:37 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2019-08-20 07:41:23 +0200 |
commit | 32191bab48d7b8c8965ddbcc2cae93802028f29b (patch) | |
tree | 7cc4a04b05d3192f3ef54890d7840694c168bd0d | |
parent | e1204be7ece0b55536b9860643d901158769019c (diff) | |
download | buildstream-32191bab48d7b8c8965ddbcc2cae93802028f29b.tar.gz |
casserver.py: Remove CacheCleaner
Cache expiry will be performed by buildbox-casd.
-rw-r--r-- | src/buildstream/_cas/casserver.py | 121 | ||||
-rw-r--r-- | tests/frontend/push.py | 3 |
2 files changed, 13 insertions, 111 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 2ed51bded..0975023ef 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -18,14 +18,12 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> from concurrent import futures -import logging import os import signal import sys import tempfile import uuid import errno -import threading import grpc from google.protobuf.message import DecodeError @@ -48,11 +46,6 @@ from .cascache import CASCache _MAX_PAYLOAD_BYTES = 1024 * 1024 -# Trying to push an artifact that is too large -class ArtifactTooLargeException(Exception): - pass - - # create_server(): # # Create gRPC CAS artifact server as specified in the Remote Execution API. @@ -72,13 +65,11 @@ def create_server(repo, *, enable_push, max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) - cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size) - bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server) + _ByteStreamServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -165,11 +156,10 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push, class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): - def __init__(self, cas, cache_cleaner, *, enable_push): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push - self.cache_cleaner = cache_cleaner def Read(self, request, context): resource_name = request.resource_name @@ -230,12 +220,6 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): while True: if client_digest.size_bytes == 0: break - try: - self.cache_cleaner.clean_up(client_digest.size_bytes) - except ArtifactTooLargeException as e: - context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) - context.set_details(str(e)) - return response try: os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes) @@ -275,11 +259,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas, cache_cleaner, *, enable_push): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push - self.cache_cleaner = cache_cleaner def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -347,18 +330,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres blob_response.status.code = code_pb2.FAILED_PRECONDITION continue - try: - self.cache_cleaner.clean_up(digest.size_bytes) - - with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: - out.write(blob_request.data) - out.flush() - server_digest = self.cas.add_object(path=out.name) - if server_digest.hash != digest.hash: - blob_response.status.code = code_pb2.FAILED_PRECONDITION - - except ArtifactTooLargeException: - blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + out.write(blob_request.data) + out.flush() + server_digest = self.cas.add_object(path=out.name) + if server_digest.hash != digest.hash: + blob_response.status.code = code_pb2.FAILED_PRECONDITION return response @@ -602,81 +579,3 @@ def _digest_from_upload_resource_name(resource_name): return digest except ValueError: return None - - -class _CacheCleaner: - - __cleanup_cache_lock = threading.Lock() - - def __init__(self, cas, max_head_size, min_head_size=int(2e9)): - self.__cas = cas - self.__max_head_size = max_head_size - self.__min_head_size = min_head_size - - def __has_space(self, object_size): - stats = os.statvfs(self.__cas.casdir) - free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size - total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size - - if object_size > total_disk_space: - raise ArtifactTooLargeException("Artifact of size: {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) - - return object_size <= free_disk_space - - # _clean_up_cache() - # - # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there - # is enough space for the incoming artifact - # - # Args: - # object_size: The size of the object being received in bytes - # - # Returns: - # int: The total bytes removed on the filesystem - # - def clean_up(self, object_size): - if self.__has_space(object_size): - return 0 - - with _CacheCleaner.__cleanup_cache_lock: - if self.__has_space(object_size): - # Another thread has done the cleanup for us - return 0 - - stats = os.statvfs(self.__cas.casdir) - target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size - - # obtain a list of LRP artifacts - LRP_objects = self.__cas.list_objects() - - removed_size = 0 # in bytes - last_mtime = 0 - - while object_size - removed_size > target_disk_space: - try: - last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact - except IndexError: - # This exception is caught if there are no more artifacts in the list - # LRP_artifacts. This means the the artifact is too large for the filesystem - # so we abort the process - raise ArtifactTooLargeException("Artifact of size {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) - - try: - size = os.stat(to_remove).st_size - os.unlink(to_remove) - removed_size += size - except FileNotFoundError: - pass - - self.__cas.clean_up_refs_until(last_mtime) - - if removed_size > 0: - logging.info("Successfully removed %d bytes from the cache", removed_size) - else: - logging.info("No artifacts were removed from the cache.") - - return removed_size diff --git a/tests/frontend/push.py b/tests/frontend/push.py index e92646154..e0a6c1e99 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -287,6 +287,7 @@ def test_push_after_pull(cli, tmpdir, datafiles): # the least recently pushed artifact is deleted in order to make room for # the incoming artifact. @pytest.mark.datafiles(DATA_DIR) +@pytest.mark.xfail() def test_artifact_expires(cli, datafiles, tmpdir): project = str(datafiles) element_path = 'elements' @@ -342,6 +343,7 @@ def test_artifact_expires(cli, datafiles, tmpdir): # Test that a large artifact, whose size exceeds the quota, is not pushed # to the remote share @pytest.mark.datafiles(DATA_DIR) +@pytest.mark.xfail() def test_artifact_too_large(cli, datafiles, tmpdir): project = str(datafiles) element_path = 'elements' @@ -378,6 +380,7 @@ def test_artifact_too_large(cli, datafiles, tmpdir): # Test that when an element is pulled recently, it is not considered the LRU element. @pytest.mark.datafiles(DATA_DIR) +@pytest.mark.xfail() def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): project = str(datafiles) element_path = 'elements' |