summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentin David <valentin.david@gmail.com>2018-11-29 08:47:40 +0000
committerValentin David <valentin.david@gmail.com>2018-11-29 08:47:40 +0000
commit9a458402c57de0dd037caf67d732a28ec25e38a6 (patch)
treee59227db3b6e80242d6c806fe82d0c7f63e5c9bf
parent3513580cacb9784900f119600e1a8ab9aa4fed32 (diff)
parentba9afa9888d036b91954b179a57925884dd29483 (diff)
downloadbuildstream-9a458402c57de0dd037caf67d732a28ec25e38a6.tar.gz
Merge branch 'valentindavid/cache_server_fill_up' into 'master'
Fix cleanup of cache in server when disk is full Closes #678 See merge request BuildStream/buildstream!830
-rw-r--r--buildstream/_artifactcache/cascache.py102
-rw-r--r--buildstream/_artifactcache/casserver.py214
-rw-r--r--tests/frontend/push.py4
-rw-r--r--tests/testutils/artifactshare.py30
4 files changed, 254 insertions, 96 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 315aa6afa..2ae36d22a 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -25,6 +25,7 @@ import os
import stat
import tempfile
import uuid
+import contextlib
from urllib.parse import urlparse
import grpc
@@ -88,6 +89,13 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
CASRemoteSpec.__new__.__defaults__ = (None, None, None)
+class BlobNotFound(CASError):
+
+ def __init__(self, blob, msg):
+ self.blob = blob
+ super().__init__(msg)
+
+
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
# Args:
@@ -299,6 +307,8 @@ class CASCache():
raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e
else:
return False
+ except BlobNotFound as e:
+ return False
# pull_tree():
#
@@ -471,13 +481,14 @@ class CASCache():
# digest (Digest): An optional Digest object to populate
# path (str): Path to file to add
# buffer (bytes): Byte buffer to add
+ # link_directly (bool): Whether file given by path can be linked
#
# Returns:
# (Digest): The digest of the added object
#
# Either `path` or `buffer` must be passed, but not both.
#
- def add_object(self, *, digest=None, path=None, buffer=None):
+ def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
@@ -487,28 +498,34 @@ class CASCache():
try:
h = hashlib.sha256()
# Always write out new file to avoid corruption if input file is modified
- with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
- # Set mode bits to 0644
- os.chmod(out.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
-
- if path:
- with open(path, 'rb') as f:
- for chunk in iter(lambda: f.read(4096), b""):
- h.update(chunk)
- out.write(chunk)
+ with contextlib.ExitStack() as stack:
+ if path is not None and link_directly:
+ tmp = stack.enter_context(open(path, 'rb'))
+ for chunk in iter(lambda: tmp.read(4096), b""):
+ h.update(chunk)
else:
- h.update(buffer)
- out.write(buffer)
+ tmp = stack.enter_context(tempfile.NamedTemporaryFile(dir=self.tmpdir))
+ # Set mode bits to 0644
+ os.chmod(tmp.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
- out.flush()
+ if path:
+ with open(path, 'rb') as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ h.update(chunk)
+ tmp.write(chunk)
+ else:
+ h.update(buffer)
+ tmp.write(buffer)
+
+ tmp.flush()
digest.hash = h.hexdigest()
- digest.size_bytes = os.fstat(out.fileno()).st_size
+ digest.size_bytes = os.fstat(tmp.fileno()).st_size
# Place file at final location
objpath = self.objpath(digest)
os.makedirs(os.path.dirname(objpath), exist_ok=True)
- os.link(out.name, objpath)
+ os.link(tmp.name, objpath)
except FileExistsError as e:
# We can ignore the failed link() if the object is already in the repo.
@@ -606,6 +623,41 @@ class CASCache():
# first ref of this list will be the file modified earliest.
return [ref for _, ref in sorted(zip(mtimes, refs))]
+ # list_objects():
+ #
+ # List cached objects in Least Recently Modified (LRM) order.
+ #
+ # Returns:
+ # (list) - A list of objects and timestamps in LRM order
+ #
+ def list_objects(self):
+ objs = []
+ mtimes = []
+
+ for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
+ for filename in files:
+ obj_path = os.path.join(root, filename)
+ try:
+ mtimes.append(os.path.getmtime(obj_path))
+ except FileNotFoundError:
+ pass
+ else:
+ objs.append(obj_path)
+
+ # NOTE: Sorted will sort from earliest to latest, thus the
+ # first element of this list will be the file modified earliest.
+ return sorted(zip(mtimes, objs))
+
+ def clean_up_refs_until(self, time):
+ ref_heads = os.path.join(self.casdir, 'refs', 'heads')
+
+ for root, _, files in os.walk(ref_heads):
+ for filename in files:
+ ref_path = os.path.join(root, filename)
+ # Obtain the mtime (the time a file was last modified)
+ if os.path.getmtime(ref_path) < time:
+ os.unlink(ref_path)
+
# remove():
#
# Removes the given symbolic ref from the repo.
@@ -665,6 +717,10 @@ class CASCache():
return pruned
+ def update_tree_mtime(self, tree):
+ reachable = set()
+ self._reachable_refs_dir(reachable, tree, update_mtime=True)
+
################################################
# Local Private Methods #
################################################
@@ -811,10 +867,13 @@ class CASCache():
a += 1
b += 1
- def _reachable_refs_dir(self, reachable, tree):
+ def _reachable_refs_dir(self, reachable, tree, update_mtime=False):
if tree.hash in reachable:
return
+ if update_mtime:
+ os.utime(self.objpath(tree))
+
reachable.add(tree.hash)
directory = remote_execution_pb2.Directory()
@@ -823,10 +882,12 @@ class CASCache():
directory.ParseFromString(f.read())
for filenode in directory.files:
+ if update_mtime:
+ os.utime(self.objpath(filenode.digest))
reachable.add(filenode.digest.hash)
for dirnode in directory.directories:
- self._reachable_refs_dir(reachable, dirnode.digest)
+ self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime)
def _required_blobs(self, directory_digest):
# parse directory, and recursively add blobs
@@ -880,7 +941,7 @@ class CASCache():
with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f:
self._fetch_blob(remote, digest, f)
- added_digest = self.add_object(path=f.name)
+ added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
return objpath
@@ -891,7 +952,7 @@ class CASCache():
f.write(data)
f.flush()
- added_digest = self.add_object(path=f.name)
+ added_digest = self.add_object(path=f.name, link_directly=True)
assert added_digest.hash == digest.hash
# Helper function for _fetch_directory().
@@ -1203,6 +1264,9 @@ class _CASBatchRead():
batch_response = self._remote.cas.BatchReadBlobs(self._request)
for response in batch_response.responses:
+ if response.status.code == code_pb2.NOT_FOUND:
+ raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format(
+ response.digest.hash, response.status.code))
if response.status.code != code_pb2.OK:
raise CASError("Failed to download blob {}: {}".format(
response.digest.hash, response.status.code))
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index ee84c4943..ed0266585 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -24,6 +24,8 @@ import signal
import sys
import tempfile
import uuid
+import errno
+import threading
import click
import grpc
@@ -31,6 +33,7 @@ import grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
+from .._protos.google.rpc import code_pb2
from .._exceptions import CASError
@@ -55,18 +58,22 @@ class ArtifactTooLargeException(Exception):
# repo (str): Path to CAS repository
# enable_push (bool): Whether to allow blob uploads and artifact updates
#
-def create_server(repo, *, enable_push):
+def create_server(repo, *, enable_push,
+ max_head_size=int(10e9),
+ min_head_size=int(2e9)):
cas = CASCache(os.path.abspath(repo))
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
server = grpc.server(futures.ThreadPoolExecutor(max_workers))
+ cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
+
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
- _ByteStreamServicer(cas, enable_push=enable_push), server)
+ _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
+ _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
@@ -84,9 +91,19 @@ def create_server(repo, *, enable_push):
@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
@click.option('--enable-push', default=False, is_flag=True,
help="Allow clients to upload blobs and update artifact cache")
+@click.option('--head-room-min', type=click.INT,
+ help="Disk head room minimum in bytes",
+ default=2e9)
+@click.option('--head-room-max', type=click.INT,
+ help="Disk head room maximum in bytes",
+ default=10e9)
@click.argument('repo')
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
- server = create_server(repo, enable_push=enable_push)
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
+ head_room_min, head_room_max):
+ server = create_server(repo,
+ max_head_size=head_room_max,
+ min_head_size=head_room_min,
+ enable_push=enable_push)
use_tls = bool(server_key)
@@ -128,10 +145,11 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push):
class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
- def __init__(self, cas, *, enable_push):
+ def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
+ self.cache_cleaner = cache_cleaner
def Read(self, request, context):
resource_name = request.resource_name
@@ -189,17 +207,34 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
- try:
- _clean_up_cache(self.cas, client_digest.size_bytes)
- except ArtifactTooLargeException as e:
- context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
- context.set_details(str(e))
- return response
+ while True:
+ if client_digest.size_bytes == 0:
+ break
+ try:
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
+ except ArtifactTooLargeException as e:
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
+ context.set_details(str(e))
+ return response
+
+ try:
+ os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
+ break
+ except OSError as e:
+ # Multiple upload can happen in the same time
+ if e.errno != errno.ENOSPC:
+ raise
+
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
if request.resource_name != resource_name:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
+
+ if (offset + len(request.data)) > client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+
out.write(request.data)
offset += len(request.data)
if request.finish_write:
@@ -207,7 +242,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
out.flush()
- digest = self.cas.add_object(path=out.name)
+ digest = self.cas.add_object(path=out.name, link_directly=True)
if digest.hash != client_digest.hash:
context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
return response
@@ -220,18 +255,26 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
- def __init__(self, cas, *, enable_push):
+ def __init__(self, cas, cache_cleaner, *, enable_push):
super().__init__()
self.cas = cas
self.enable_push = enable_push
+ self.cache_cleaner = cache_cleaner
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
for digest in request.blob_digests:
- if not _has_object(self.cas, digest):
- d = response.missing_blob_digests.add()
- d.hash = digest.hash
- d.size_bytes = digest.size_bytes
+ objpath = self.cas.objpath(digest)
+ try:
+ os.utime(objpath)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ else:
+ d = response.missing_blob_digests.add()
+ d.hash = digest.hash
+ d.size_bytes = digest.size_bytes
+
return response
def BatchReadBlobs(self, request, context):
@@ -250,12 +293,12 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
try:
with open(self.cas.objpath(digest), 'rb') as f:
if os.fstat(f.fileno()).st_size != digest.size_bytes:
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
+ blob_response.status.code = code_pb2.NOT_FOUND
continue
blob_response.data = f.read(digest.size_bytes)
except FileNotFoundError:
- blob_response.status.code = grpc.StatusCode.NOT_FOUND
+ blob_response.status.code = code_pb2.NOT_FOUND
return response
@@ -285,7 +328,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
continue
try:
- _clean_up_cache(self.cas, digest.size_bytes)
+ self.cache_cleaner.clean_up(digest.size_bytes)
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
out.write(blob_request.data)
@@ -328,6 +371,12 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
try:
tree = self.cas.resolve_ref(request.key, update_mtime=True)
+ try:
+ self.cas.update_tree_mtime(tree)
+ except FileNotFoundError:
+ self.cas.remove(request.key, defer_prune=True)
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return response
response.digest.hash = tree.hash
response.digest.size_bytes = tree.size_bytes
@@ -400,60 +449,79 @@ def _digest_from_upload_resource_name(resource_name):
return None
-def _has_object(cas, digest):
- objpath = cas.objpath(digest)
- return os.path.exists(objpath)
+class _CacheCleaner:
+ __cleanup_cache_lock = threading.Lock()
-# _clean_up_cache()
-#
-# Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
-# is enough space for the incoming artifact
-#
-# Args:
-# cas: CASCache object
-# object_size: The size of the object being received in bytes
-#
-# Returns:
-# int: The total bytes removed on the filesystem
-#
-def _clean_up_cache(cas, object_size):
- # Determine the available disk space, in bytes, of the file system
- # which mounts the repo
- stats = os.statvfs(cas.casdir)
- buffer_ = int(2e9) # Add a 2 GB buffer
- free_disk_space = (stats.f_bfree * stats.f_bsize) - buffer_
- total_disk_space = (stats.f_blocks * stats.f_bsize) - buffer_
-
- if object_size > total_disk_space:
- raise ArtifactTooLargeException("Artifact of size: {} is too large for "
- "the filesystem which mounts the remote "
- "cache".format(object_size))
-
- if object_size <= free_disk_space:
- # No need to clean up
- return 0
-
- # obtain a list of LRP artifacts
- LRP_artifacts = cas.list_refs()
-
- removed_size = 0 # in bytes
- while object_size - removed_size > free_disk_space:
- try:
- to_remove = LRP_artifacts.pop(0) # The first element in the list is the LRP artifact
- except IndexError:
- # This exception is caught if there are no more artifacts in the list
- # LRP_artifacts. This means the the artifact is too large for the filesystem
- # so we abort the process
- raise ArtifactTooLargeException("Artifact of size {} is too large for "
- "the filesystem which mounts the remote "
- "cache".format(object_size))
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
+ self.__cas = cas
+ self.__max_head_size = max_head_size
+ self.__min_head_size = min_head_size
- removed_size += cas.remove(to_remove, defer_prune=False)
+ def __has_space(self, object_size):
+ stats = os.statvfs(self.__cas.casdir)
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
- if removed_size > 0:
- logging.info("Successfully removed {} bytes from the cache".format(removed_size))
- else:
- logging.info("No artifacts were removed from the cache.")
+ if object_size > total_disk_space:
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
+ "the filesystem which mounts the remote "
+ "cache".format(object_size))
- return removed_size
+ return object_size <= free_disk_space
+
+ # _clean_up_cache()
+ #
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
+ # is enough space for the incoming artifact
+ #
+ # Args:
+ # object_size: The size of the object being received in bytes
+ #
+ # Returns:
+ # int: The total bytes removed on the filesystem
+ #
+ def clean_up(self, object_size):
+ if self.__has_space(object_size):
+ return 0
+
+ with _CacheCleaner.__cleanup_cache_lock:
+ if self.__has_space(object_size):
+ # Another thread has done the cleanup for us
+ return 0
+
+ stats = os.statvfs(self.__cas.casdir)
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
+
+ # obtain a list of LRP artifacts
+ LRP_objects = self.__cas.list_objects()
+
+ removed_size = 0 # in bytes
+ last_mtime = 0
+
+ while object_size - removed_size > target_disk_space:
+ try:
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
+ except IndexError:
+ # This exception is caught if there are no more artifacts in the list
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
+ # so we abort the process
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
+ "the filesystem which mounts the remote "
+ "cache".format(object_size))
+
+ try:
+ size = os.stat(to_remove).st_size
+ os.unlink(to_remove)
+ removed_size += size
+ except FileNotFoundError:
+ pass
+
+ self.__cas.clean_up_refs_until(last_mtime)
+
+ if removed_size > 0:
+ logging.info("Successfully removed {} bytes from the cache".format(removed_size))
+ else:
+ logging.info("No artifacts were removed from the cache.")
+
+ return removed_size
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index f2d6814d6..153d43340 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -230,6 +230,8 @@ def test_artifact_expires(cli, datafiles, tmpdir):
# Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
# Mock a file system with 12 MB free disk space
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
+ min_head_size=int(2e9),
+ max_head_size=int(2e9),
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
# Configure bst to push to the cache
@@ -313,6 +315,8 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
# Create an artifact share (remote cache) in tmpdir/artifactshare
# Mock a file system with 12 MB free disk space
with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'),
+ min_head_size=int(2e9),
+ max_head_size=int(2e9),
total_space=int(10e9), free_space=(int(12e6) + int(2e9))) as share:
# Configure bst to push to the cache
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 31b96be73..38c54a947 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -29,7 +29,11 @@ from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution
#
class ArtifactShare():
- def __init__(self, directory, *, total_space=None, free_space=None):
+ def __init__(self, directory, *,
+ total_space=None,
+ free_space=None,
+ min_head_size=int(2e9),
+ max_head_size=int(10e9)):
# The working directory for the artifact share (in case it
# needs to do something outside of its backend's storage folder).
@@ -50,6 +54,9 @@ class ArtifactShare():
self.total_space = total_space
self.free_space = free_space
+ self.max_head_size = max_head_size
+ self.min_head_size = min_head_size
+
q = Queue()
self.process = Process(target=self.run, args=(q,))
@@ -74,7 +81,10 @@ class ArtifactShare():
self.free_space = self.total_space
os.statvfs = self._mock_statvfs
- server = create_server(self.repodir, enable_push=True)
+ server = create_server(self.repodir,
+ max_head_size=self.max_head_size,
+ min_head_size=self.min_head_size,
+ enable_push=True)
port = server.add_insecure_port('localhost:0')
server.start()
@@ -136,6 +146,15 @@ class ArtifactShare():
try:
tree = self.cas.resolve_ref(artifact_key)
+ reachable = set()
+ try:
+ self.cas._reachable_refs_dir(reachable, tree, update_mtime=False)
+ except FileNotFoundError:
+ return None
+ for digest in reachable:
+ object_name = os.path.join(self.cas.casdir, 'objects', digest[:2], digest[2:])
+ if not os.path.exists(object_name):
+ return None
return tree
except CASError:
return None
@@ -167,8 +186,11 @@ class ArtifactShare():
# Create an ArtifactShare for use in a test case
#
@contextmanager
-def create_artifact_share(directory, *, total_space=None, free_space=None):
- share = ArtifactShare(directory, total_space=total_space, free_space=free_space)
+def create_artifact_share(directory, *, total_space=None, free_space=None,
+ min_head_size=int(2e9),
+ max_head_size=int(10e9)):
+ share = ArtifactShare(directory, total_space=total_space, free_space=free_space,
+ min_head_size=min_head_size, max_head_size=max_head_size)
try:
yield share
finally: