diff options
author | Valentin David <valentin.david@gmail.com> | 2018-11-29 08:47:40 +0000 |
---|---|---|
committer | Valentin David <valentin.david@gmail.com> | 2018-11-29 08:47:40 +0000 |
commit | 9a458402c57de0dd037caf67d732a28ec25e38a6 (patch) | |
tree | e59227db3b6e80242d6c806fe82d0c7f63e5c9bf | |
parent | 3513580cacb9784900f119600e1a8ab9aa4fed32 (diff) | |
parent | ba9afa9888d036b91954b179a57925884dd29483 (diff) | |
download | buildstream-9a458402c57de0dd037caf67d732a28ec25e38a6.tar.gz |
Merge branch 'valentindavid/cache_server_fill_up' into 'master'
Fix cleanup of cache in server when disk is full
Closes #678
See merge request BuildStream/buildstream!830
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 102 | ||||
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 214 | ||||
-rw-r--r-- | tests/frontend/push.py | 4 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 30 |
4 files changed, 254 insertions, 96 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 315aa6afa..2ae36d22a 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -25,6 +25,7 @@ import os import stat import tempfile import uuid +import contextlib from urllib.parse import urlparse import grpc @@ -88,6 +89,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key CASRemoteSpec.__new__.__defaults__ = (None, None, None) +class BlobNotFound(CASError): + + def __init__(self, blob, msg): + self.blob = blob + super().__init__(msg) + + # A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: @@ -299,6 +307,8 @@ class CASCache(): raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e else: return False + except BlobNotFound as e: + return False # pull_tree(): # @@ -471,13 +481,14 @@ class CASCache(): # digest (Digest): An optional Digest object to populate # path (str): Path to file to add # buffer (bytes): Byte buffer to add + # link_directly (bool): Whether file given by path can be linked # # Returns: # (Digest): The digest of the added object # # Either `path` or `buffer` must be passed, but not both. # - def add_object(self, *, digest=None, path=None, buffer=None): + def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False): # Exactly one of the two parameters has to be specified assert (path is None) != (buffer is None) @@ -487,28 +498,34 @@ class CASCache(): try: h = hashlib.sha256() # Always write out new file to avoid corruption if input file is modified - with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out: - # Set mode bits to 0644 - os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) - - if path: - with open(path, 'rb') as f: - for chunk in iter(lambda: f.read(4096), b""): - h.update(chunk) - out.write(chunk) + with contextlib.ExitStack() as stack: + if path is not None and link_directly: + tmp = stack.enter_context(open(path, 'rb')) + for chunk in iter(lambda: tmp.read(4096), b""): + h.update(chunk) else: - h.update(buffer) - out.write(buffer) + tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir)) + # Set mode bits to 0644 + os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) - out.flush() + if path: + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b""): + h.update(chunk) + tmp.write(chunk) + else: + h.update(buffer) + tmp.write(buffer) + + tmp.flush() digest.hash = h.hexdigest() - digest.size_bytes = os.fstat(out.fileno()).st_size + digest.size_bytes = os.fstat(tmp.fileno()).st_size # Place file at final location objpath = self.objpath(digest) os.makedirs(os.path.dirname(objpath), exist_ok=True) - os.link(out.name, objpath) + os.link(tmp.name, objpath) except FileExistsError as e: # We can ignore the failed link() if the object is already in the repo. @@ -606,6 +623,41 @@ class CASCache(): # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] + # list_objects(): + # + # List cached objects in Least Recently Modified (LRM) order. + # + # Returns: + # (list) - A list of objects and timestamps in LRM order + # + def list_objects(self): + objs = [] + mtimes = [] + + for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): + for filename in files: + obj_path = os.path.join(root, filename) + try: + mtimes.append(os.path.getmtime(obj_path)) + except FileNotFoundError: + pass + else: + objs.append(obj_path) + + # NOTE: Sorted will sort from earliest to latest, thus the + # first element of this list will be the file modified earliest. + return sorted(zip(mtimes, objs)) + + def clean_up_refs_until(self, time): + ref_heads = os.path.join(self.casdir, 'refs', 'heads') + + for root, _, files in os.walk(ref_heads): + for filename in files: + ref_path = os.path.join(root, filename) + # Obtain the mtime (the time a file was last modified) + if os.path.getmtime(ref_path) < time: + os.unlink(ref_path) + # remove(): # # Removes the given symbolic ref from the repo. @@ -665,6 +717,10 @@ class CASCache(): return pruned + def update_tree_mtime(self, tree): + reachable = set() + self._reachable_refs_dir(reachable, tree, update_mtime=True) + ################################################ # Local Private Methods # ################################################ @@ -811,10 +867,13 @@ class CASCache(): a += 1 b += 1 - def _reachable_refs_dir(self, reachable, tree): + def _reachable_refs_dir(self, reachable, tree, update_mtime=False): if tree.hash in reachable: return + if update_mtime: + os.utime(self.objpath(tree)) + reachable.add(tree.hash) directory = remote_execution_pb2.Directory() @@ -823,10 +882,12 @@ class CASCache(): directory.ParseFromString(f.read()) for filenode in directory.files: + if update_mtime: + os.utime(self.objpath(filenode.digest)) reachable.add(filenode.digest.hash) for dirnode in directory.directories: - self._reachable_refs_dir(reachable, dirnode.digest) + self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime) def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs @@ -880,7 +941,7 @@ class CASCache(): with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: self._fetch_blob(remote, digest, f) - added_digest = self.add_object(path=f.name) + added_digest = self.add_object(path=f.name, link_directly=True) assert added_digest.hash == digest.hash return objpath @@ -891,7 +952,7 @@ class CASCache(): f.write(data) f.flush() - added_digest = self.add_object(path=f.name) + added_digest = self.add_object(path=f.name, link_directly=True) assert added_digest.hash == digest.hash # Helper function for _fetch_directory(). @@ -1203,6 +1264,9 @@ class _CASBatchRead(): batch_response = self._remote.cas.BatchReadBlobs(self._request) for response in batch_response.responses: + if response.status.code == code_pb2.NOT_FOUND: + raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) if response.status.code != code_pb2.OK: raise CASError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index ee84c4943..ed0266585 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -24,6 +24,8 @@ import signal import sys import tempfile import uuid +import errno +import threading import click import grpc @@ -31,6 +33,7 @@ import grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc +from .._protos.google.rpc import code_pb2 from .._exceptions import CASError @@ -55,18 +58,22 @@ class ArtifactTooLargeException(Exception): # repo (str): Path to CAS repository # enable_push (bool): Whether to allow blob uploads and artifact updates # -def create_server(repo, *, enable_push): +def create_server(repo, *, enable_push, + max_head_size=int(10e9), + min_head_size=int(2e9)): cas = CASCache(os.path.abspath(repo)) # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) + cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size) + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(cas, enable_push=enable_push), server) + _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -84,9 +91,19 @@ def create_server(repo, *, enable_push): @click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)") @click.option('--enable-push', default=False, is_flag=True, help="Allow clients to upload blobs and update artifact cache") +@click.option('--head-room-min', type=click.INT, + help="Disk head room minimum in bytes", + default=2e9) +@click.option('--head-room-max', type=click.INT, + help="Disk head room maximum in bytes", + default=10e9) @click.argument('repo') -def server_main(repo, port, server_key, server_cert, client_certs, enable_push): - server = create_server(repo, enable_push=enable_push) +def server_main(repo, port, server_key, server_cert, client_certs, enable_push, + head_room_min, head_room_max): + server = create_server(repo, + max_head_size=head_room_max, + min_head_size=head_room_min, + enable_push=enable_push) use_tls = bool(server_key) @@ -128,10 +145,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push): class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): - def __init__(self, cas, *, enable_push): + def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push + self.cache_cleaner = cache_cleaner def Read(self, request, context): resource_name = request.resource_name @@ -189,17 +207,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): context.set_code(grpc.StatusCode.NOT_FOUND) return response - try: - _clean_up_cache(self.cas, client_digest.size_bytes) - except ArtifactTooLargeException as e: - context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) - context.set_details(str(e)) - return response + while True: + if client_digest.size_bytes == 0: + break + try: + self.cache_cleaner.clean_up(client_digest.size_bytes) + except ArtifactTooLargeException as e: + context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) + context.set_details(str(e)) + return response + + try: + os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes) + break + except OSError as e: + # Multiple upload can happen in the same time + if e.errno != errno.ENOSPC: + raise + elif request.resource_name: # If it is set on subsequent calls, it **must** match the value of the first request. if request.resource_name != resource_name: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response + + if (offset + len(request.data)) > client_digest.size_bytes: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response + out.write(request.data) offset += len(request.data) if request.finish_write: @@ -207,7 +242,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response out.flush() - digest = self.cas.add_object(path=out.name) + digest = self.cas.add_object(path=out.name, link_directly=True) if digest.hash != client_digest.hash: context.set_code(grpc.StatusCode.FAILED_PRECONDITION) return response @@ -220,18 +255,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas, *, enable_push): + def __init__(self, cas, cache_cleaner, *, enable_push): super().__init__() self.cas = cas self.enable_push = enable_push + self.cache_cleaner = cache_cleaner def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() for digest in request.blob_digests: - if not _has_object(self.cas, digest): - d = response.missing_blob_digests.add() - d.hash = digest.hash - d.size_bytes = digest.size_bytes + objpath = self.cas.objpath(digest) + try: + os.utime(objpath) + except OSError as e: + if e.errno != errno.ENOENT: + raise + else: + d = response.missing_blob_digests.add() + d.hash = digest.hash + d.size_bytes = digest.size_bytes + return response def BatchReadBlobs(self, request, context): @@ -250,12 +293,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres try: with open(self.cas.objpath(digest), 'rb') as f: if os.fstat(f.fileno()).st_size != digest.size_bytes: - blob_response.status.code = grpc.StatusCode.NOT_FOUND + blob_response.status.code = code_pb2.NOT_FOUND continue blob_response.data = f.read(digest.size_bytes) except FileNotFoundError: - blob_response.status.code = grpc.StatusCode.NOT_FOUND + blob_response.status.code = code_pb2.NOT_FOUND return response @@ -285,7 +328,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres continue try: - _clean_up_cache(self.cas, digest.size_bytes) + self.cache_cleaner.clean_up(digest.size_bytes) with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: out.write(blob_request.data) @@ -328,6 +371,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): try: tree = self.cas.resolve_ref(request.key, update_mtime=True) + try: + self.cas.update_tree_mtime(tree) + except FileNotFoundError: + self.cas.remove(request.key, defer_prune=True) + context.set_code(grpc.StatusCode.NOT_FOUND) + return response response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes @@ -400,60 +449,79 @@ def _digest_from_upload_resource_name(resource_name): return None -def _has_object(cas, digest): - objpath = cas.objpath(digest) - return os.path.exists(objpath) +class _CacheCleaner: + __cleanup_cache_lock = threading.Lock() -# _clean_up_cache() -# -# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there -# is enough space for the incoming artifact -# -# Args: -# cas: CASCache object -# object_size: The size of the object being received in bytes -# -# Returns: -# int: The total bytes removed on the filesystem -# -def _clean_up_cache(cas, object_size): - # Determine the available disk space, in bytes, of the file system - # which mounts the repo - stats = os.statvfs(cas.casdir) - buffer_ = int(2e9) # Add a 2 GB buffer - free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_ - total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_ - - if object_size > total_disk_space: - raise ArtifactTooLargeException("Artifact of size: {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) - - if object_size <= free_disk_space: - # No need to clean up - return 0 - - # obtain a list of LRP artifacts - LRP_artifacts = cas.list_refs() - - removed_size = 0 # in bytes - while object_size - removed_size > free_disk_space: - try: - to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact - except IndexError: - # This exception is caught if there are no more artifacts in the list - # LRP_artifacts. This means the the artifact is too large for the filesystem - # so we abort the process - raise ArtifactTooLargeException("Artifact of size {} is too large for " - "the filesystem which mounts the remote " - "cache".format(object_size)) + def __init__(self, cas, max_head_size, min_head_size=int(2e9)): + self.__cas = cas + self.__max_head_size = max_head_size + self.__min_head_size = min_head_size - removed_size += cas.remove(to_remove, defer_prune=False) + def __has_space(self, object_size): + stats = os.statvfs(self.__cas.casdir) + free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size + total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size - if removed_size > 0: - logging.info("Successfully removed {} bytes from the cache".format(removed_size)) - else: - logging.info("No artifacts were removed from the cache.") + if object_size > total_disk_space: + raise ArtifactTooLargeException("Artifact of size: {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) - return removed_size + return object_size <= free_disk_space + + # _clean_up_cache() + # + # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there + # is enough space for the incoming artifact + # + # Args: + # object_size: The size of the object being received in bytes + # + # Returns: + # int: The total bytes removed on the filesystem + # + def clean_up(self, object_size): + if self.__has_space(object_size): + return 0 + + with _CacheCleaner.__cleanup_cache_lock: + if self.__has_space(object_size): + # Another thread has done the cleanup for us + return 0 + + stats = os.statvfs(self.__cas.casdir) + target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size + + # obtain a list of LRP artifacts + LRP_objects = self.__cas.list_objects() + + removed_size = 0 # in bytes + last_mtime = 0 + + while object_size - removed_size > target_disk_space: + try: + last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact + except IndexError: + # This exception is caught if there are no more artifacts in the list + # LRP_artifacts. This means the the artifact is too large for the filesystem + # so we abort the process + raise ArtifactTooLargeException("Artifact of size {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) + + try: + size = os.stat(to_remove).st_size + os.unlink(to_remove) + removed_size += size + except FileNotFoundError: + pass + + self.__cas.clean_up_refs_until(last_mtime) + + if removed_size > 0: + logging.info("Successfully removed {} bytes from the cache".format(removed_size)) + else: + logging.info("No artifacts were removed from the cache.") + + return removed_size diff --git a/tests/frontend/push.py b/tests/frontend/push.py index f2d6814d6..153d43340 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir): # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare # Mock a file system with 12 MB free disk space with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'), + min_head_size=int(2e9), + max_head_size=int(2e9), total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share: # Configure bst to push to the cache @@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir): # Create an artifact share (remote cache) in tmpdir/artifactshare # Mock a file system with 12 MB free disk space with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'), + min_head_size=int(2e9), + max_head_size=int(2e9), total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share: # Configure bst to push to the cache diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index 31b96be73..38c54a947 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution # class ArtifactShare(): - def __init__(self, directory, *, total_space=None, free_space=None): + def __init__(self, directory, *, + total_space=None, + free_space=None, + min_head_size=int(2e9), + max_head_size=int(10e9)): # The working directory for the artifact share (in case it # needs to do something outside of its backend's storage folder). @@ -50,6 +54,9 @@ class ArtifactShare(): self.total_space = total_space self.free_space = free_space + self.max_head_size = max_head_size + self.min_head_size = min_head_size + q = Queue() self.process = Process(target=self.run, args=(q,)) @@ -74,7 +81,10 @@ class ArtifactShare(): self.free_space = self.total_space os.statvfs = self._mock_statvfs - server = create_server(self.repodir, enable_push=True) + server = create_server(self.repodir, + max_head_size=self.max_head_size, + min_head_size=self.min_head_size, + enable_push=True) port = server.add_insecure_port('localhost:0') server.start() @@ -136,6 +146,15 @@ class ArtifactShare(): try: tree = self.cas.resolve_ref(artifact_key) + reachable = set() + try: + self.cas._reachable_refs_dir(reachable, tree, update_mtime=False) + except FileNotFoundError: + return None + for digest in reachable: + object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:]) + if not os.path.exists(object_name): + return None return tree except CASError: return None @@ -167,8 +186,11 @@ class ArtifactShare(): # Create an ArtifactShare for use in a test case # @contextmanager -def create_artifact_share(directory, *, total_space=None, free_space=None): - share = ArtifactShare(directory, total_space=total_space, free_space=free_space) +def create_artifact_share(directory, *, total_space=None, free_space=None, + min_head_size=int(2e9), + max_head_size=int(10e9)): + share = ArtifactShare(directory, total_space=total_space, free_space=free_space, + min_head_size=min_head_size, max_head_size=max_head_size) try: yield share finally: |