summaryrefslogtreecommitdiff
path: root/src/buildstream/_cas/casserver.py
diff options
context:
space:
mode:
authorChandan Singh <csingh43@bloomberg.net>2019-04-24 22:53:19 +0100
committerChandan Singh <csingh43@bloomberg.net>2019-05-21 12:41:18 +0100
commit070d053e5cc47e572e9f9e647315082bd7a15c63 (patch)
tree7fb0fdff52f9b5f8a18ec8fe9c75b661f9e0839e /src/buildstream/_cas/casserver.py
parent6c59e7901a52be961c2a1b671cf2b30f90bc4d0a (diff)
downloadbuildstream-070d053e5cc47e572e9f9e647315082bd7a15c63.tar.gz
Move source from 'buildstream' to 'src/buildstream'
This was discussed in #1008. Fixes #1009.
Diffstat (limited to 'src/buildstream/_cas/casserver.py')
-rw-r--r--src/buildstream/_cas/casserver.py619
1 files changed, 619 insertions, 0 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
new file mode 100644
index 000000000..c08a4d577
--- /dev/null
+++ b/src/buildstream/_cas/casserver.py
@@ -0,0 +1,619 @@
+#
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+from concurrent import futures
+import logging
+import os
+import signal
+import sys
+import tempfile
+import uuid
+import errno
+import threading
+
+import grpc
+from google.protobuf.message import DecodeError
+import click
+
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
+from .._protos.google.rpc import code_pb2
+from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
+ artifact_pb2, artifact_pb2_grpc
+
+from .._exceptions import CASError
+
+from .cascache import CASCache
+
+
+# The default limit for gRPC messages is 4 MiB.
+# Limit payload to 1 MiB to leave sufficient headroom for metadata.
+_MAX_PAYLOAD_BYTES = 1024 * 1024
+
+
+# Trying to push an artifact that is too large
+class ArtifactTooLargeException(Exception):
+ pass
+
+
+# create_server():
+#
+# Create gRPC CAS artifact server as specified in the Remote Execution API.
+#
+# Args:
+# repo (str): Path to CAS repository
+# enable_push (bool): Whether to allow blob uploads and artifact updates
+#
+def create_server(repo, *, enable_push,
+ max_head_size=int(10e9),
+ min_head_size=int(2e9)):
+ cas = CASCache(os.path.abspath(repo))
+ artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
+
+ # Use max_workers default from Python 3.5+
+ max_workers = (os.cpu_count() or 1) * 5
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers))
+
+ cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size)
+
+ bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
+ _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server)
+
+ remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
+ _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server)
+
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
+ _CapabilitiesServicer(), server)
+
+ buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+
+ artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
+ _ArtifactServicer(cas, artifactdir), server)
+
+ return server
+
+
+@click.command(short_help="CAS Artifact Server")
+@click.option('--port', '-p', type=click.INT, required=True, help="Port number")
+@click.option('--server-key', help="Private server key for TLS (PEM-encoded)")
+@click.option('--server-cert', help="Public server certificate for TLS (PEM-encoded)")
+@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
+@click.option('--enable-push', default=False, is_flag=True,
+ help="Allow clients to upload blobs and update artifact cache")
+@click.option('--head-room-min', type=click.INT,
+ help="Disk head room minimum in bytes",
+ default=2e9)
+@click.option('--head-room-max', type=click.INT,
+ help="Disk head room maximum in bytes",
+ default=10e9)
+@click.argument('repo')
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
+ head_room_min, head_room_max):
+ server = create_server(repo,
+ max_head_size=head_room_max,
+ min_head_size=head_room_min,
+ enable_push=enable_push)
+
+ use_tls = bool(server_key)
+
+ if bool(server_cert) != use_tls:
+ click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True)
+ sys.exit(-1)
+
+ if client_certs and not use_tls:
+ click.echo("ERROR: --client-certs can only be used with --server-key", err=True)
+ sys.exit(-1)
+
+ if use_tls:
+ # Read public/private key pair
+ with open(server_key, 'rb') as f:
+ server_key_bytes = f.read()
+ with open(server_cert, 'rb') as f:
+ server_cert_bytes = f.read()
+
+ if client_certs:
+ with open(client_certs, 'rb') as f:
+ client_certs_bytes = f.read()
+ else:
+ client_certs_bytes = None
+
+ credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)],
+ root_certificates=client_certs_bytes,
+ require_client_auth=bool(client_certs))
+ server.add_secure_port('[::]:{}'.format(port), credentials)
+ else:
+ server.add_insecure_port('[::]:{}'.format(port))
+
+ # Run artifact server
+ server.start()
+ try:
+ while True:
+ signal.pause()
+ except KeyboardInterrupt:
+ server.stop(0)
+
+
+class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
+ def __init__(self, cas, cache_cleaner, *, enable_push):
+ super().__init__()
+ self.cas = cas
+ self.enable_push = enable_push
+ self.cache_cleaner = cache_cleaner
+
+ def Read(self, request, context):
+ resource_name = request.resource_name
+ client_digest = _digest_from_download_resource_name(resource_name)
+ if client_digest is None:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return
+
+ if request.read_offset > client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.OUT_OF_RANGE)
+ return
+
+ try:
+ with open(self.cas.objpath(client_digest), 'rb') as f:
+ if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return
+
+ if request.read_offset > 0:
+ f.seek(request.read_offset)
+
+ remaining = client_digest.size_bytes - request.read_offset
+ while remaining > 0:
+ chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
+ remaining -= chunk_size
+
+ response = bytestream_pb2.ReadResponse()
+ # max. 64 kB chunks
+ response.data = f.read(chunk_size)
+ yield response
+ except FileNotFoundError:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+
+ def Write(self, request_iterator, context):
+ response = bytestream_pb2.WriteResponse()
+
+ if not self.enable_push:
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
+ return response
+
+ offset = 0
+ finished = False
+ resource_name = None
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
+ for request in request_iterator:
+ if finished or request.write_offset != offset:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+
+ if resource_name is None:
+ # First request
+ resource_name = request.resource_name
+ client_digest = _digest_from_upload_resource_name(resource_name)
+ if client_digest is None:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return response
+
+ while True:
+ if client_digest.size_bytes == 0:
+ break
+ try:
+ self.cache_cleaner.clean_up(client_digest.size_bytes)
+ except ArtifactTooLargeException as e:
+ context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
+ context.set_details(str(e))
+ return response
+
+ try:
+ os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes)
+ break
+ except OSError as e:
+ # Multiple upload can happen in the same time
+ if e.errno != errno.ENOSPC:
+ raise
+
+ elif request.resource_name:
+ # If it is set on subsequent calls, it **must** match the value of the first request.
+ if request.resource_name != resource_name:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+
+ if (offset + len(request.data)) > client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+
+ out.write(request.data)
+ offset += len(request.data)
+ if request.finish_write:
+ if client_digest.size_bytes != offset:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+ out.flush()
+ digest = self.cas.add_object(path=out.name, link_directly=True)
+ if digest.hash != client_digest.hash:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+ finished = True
+
+ assert finished
+
+ response.committed_size = offset
+ return response
+
+
+class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
+ def __init__(self, cas, cache_cleaner, *, enable_push):
+ super().__init__()
+ self.cas = cas
+ self.enable_push = enable_push
+ self.cache_cleaner = cache_cleaner
+
+ def FindMissingBlobs(self, request, context):
+ response = remote_execution_pb2.FindMissingBlobsResponse()
+ for digest in request.blob_digests:
+ objpath = self.cas.objpath(digest)
+ try:
+ os.utime(objpath)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ else:
+ d = response.missing_blob_digests.add()
+ d.hash = digest.hash
+ d.size_bytes = digest.size_bytes
+
+ return response
+
+ def BatchReadBlobs(self, request, context):
+ response = remote_execution_pb2.BatchReadBlobsResponse()
+ batch_size = 0
+
+ for digest in request.digests:
+ batch_size += digest.size_bytes
+ if batch_size > _MAX_PAYLOAD_BYTES:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ return response
+
+ blob_response = response.responses.add()
+ blob_response.digest.hash = digest.hash
+ blob_response.digest.size_bytes = digest.size_bytes
+ try:
+ with open(self.cas.objpath(digest), 'rb') as f:
+ if os.fstat(f.fileno()).st_size != digest.size_bytes:
+ blob_response.status.code = code_pb2.NOT_FOUND
+ continue
+
+ blob_response.data = f.read(digest.size_bytes)
+ except FileNotFoundError:
+ blob_response.status.code = code_pb2.NOT_FOUND
+
+ return response
+
+ def BatchUpdateBlobs(self, request, context):
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
+
+ if not self.enable_push:
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
+ return response
+
+ batch_size = 0
+
+ for blob_request in request.requests:
+ digest = blob_request.digest
+
+ batch_size += digest.size_bytes
+ if batch_size > _MAX_PAYLOAD_BYTES:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ return response
+
+ blob_response = response.responses.add()
+ blob_response.digest.hash = digest.hash
+ blob_response.digest.size_bytes = digest.size_bytes
+
+ if len(blob_request.data) != digest.size_bytes:
+ blob_response.status.code = code_pb2.FAILED_PRECONDITION
+ continue
+
+ try:
+ self.cache_cleaner.clean_up(digest.size_bytes)
+
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
+ out.write(blob_request.data)
+ out.flush()
+ server_digest = self.cas.add_object(path=out.name)
+ if server_digest.hash != digest.hash:
+ blob_response.status.code = code_pb2.FAILED_PRECONDITION
+
+ except ArtifactTooLargeException:
+ blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED
+
+ return response
+
+
+class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
+ def GetCapabilities(self, request, context):
+ response = remote_execution_pb2.ServerCapabilities()
+
+ cache_capabilities = response.cache_capabilities
+ cache_capabilities.digest_function.append(remote_execution_pb2.SHA256)
+ cache_capabilities.action_cache_update_capabilities.update_enabled = False
+ cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
+ cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED
+
+ response.deprecated_api_version.major = 2
+ response.low_api_version.major = 2
+ response.high_api_version.major = 2
+
+ return response
+
+
+class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
+ def __init__(self, cas, *, enable_push):
+ super().__init__()
+ self.cas = cas
+ self.enable_push = enable_push
+
+ def GetReference(self, request, context):
+ response = buildstream_pb2.GetReferenceResponse()
+
+ try:
+ tree = self.cas.resolve_ref(request.key, update_mtime=True)
+ try:
+ self.cas.update_tree_mtime(tree)
+ except FileNotFoundError:
+ self.cas.remove(request.key, defer_prune=True)
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return response
+
+ response.digest.hash = tree.hash
+ response.digest.size_bytes = tree.size_bytes
+ except CASError:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+
+ return response
+
+ def UpdateReference(self, request, context):
+ response = buildstream_pb2.UpdateReferenceResponse()
+
+ if not self.enable_push:
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
+ return response
+
+ for key in request.keys:
+ self.cas.set_ref(key, request.digest)
+
+ return response
+
+ def Status(self, request, context):
+ response = buildstream_pb2.StatusResponse()
+
+ response.allow_updates = self.enable_push
+
+ return response
+
+
+class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
+
+ def __init__(self, cas, artifactdir):
+ super().__init__()
+ self.cas = cas
+ self.artifactdir = artifactdir
+ os.makedirs(artifactdir, exist_ok=True)
+
+ def GetArtifact(self, request, context):
+ artifact_path = os.path.join(self.artifactdir, request.cache_key)
+ if not os.path.exists(artifact_path):
+ context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found")
+
+ artifact = artifact_pb2.Artifact()
+ with open(artifact_path, 'rb') as f:
+ artifact.ParseFromString(f.read())
+
+ # Now update mtimes of files present.
+ try:
+
+ if str(artifact.files):
+ self.cas.update_tree_mtime(artifact.files)
+
+ if str(artifact.buildtree):
+ # buildtrees might not be there
+ try:
+ self.cas.update_tree_mtime(artifact.buildtree)
+ except FileNotFoundError:
+ pass
+
+ if str(artifact.public_data):
+ os.utime(self.cas.objpath(artifact.public_data))
+
+ for log_file in artifact.logs:
+ os.utime(self.cas.objpath(log_file.digest))
+
+ except FileNotFoundError:
+ os.unlink(artifact_path)
+ context.abort(grpc.StatusCode.NOT_FOUND,
+ "Artifact files incomplete")
+ except DecodeError:
+ context.abort(grpc.StatusCode.NOT_FOUND,
+ "Artifact files not valid")
+
+ return artifact
+
+ def UpdateArtifact(self, request, context):
+ artifact = request.artifact
+
+ # Check that the files specified are in the CAS
+ self._check_directory("files", artifact.files, context)
+
+ # Unset protocol buffers don't evaluated to False but do return empty
+ # strings, hence str()
+ if str(artifact.public_data):
+ self._check_file("public data", artifact.public_data, context)
+
+ for log_file in artifact.logs:
+ self._check_file("log digest", log_file.digest, context)
+
+ # Add the artifact proto to the cas
+ artifact_path = os.path.join(self.artifactdir, request.cache_key)
+ os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
+ with open(artifact_path, 'wb') as f:
+ f.write(artifact.SerializeToString())
+
+ return artifact
+
+ def _check_directory(self, name, digest, context):
+ try:
+ directory = remote_execution_pb2.Directory()
+ with open(self.cas.objpath(digest), 'rb') as f:
+ directory.ParseFromString(f.read())
+ except FileNotFoundError:
+ context.abort(grpc.StatusCode.FAILED_PRECONDITION,
+ "Artifact {} specified but no files found".format(name))
+ except DecodeError:
+ context.abort(grpc.StatusCode.FAILED_PRECONDITION,
+ "Artifact {} specified but directory not found".format(name))
+
+ def _check_file(self, name, digest, context):
+ if not os.path.exists(self.cas.objpath(digest)):
+ context.abort(grpc.StatusCode.FAILED_PRECONDITION,
+ "Artifact {} specified but not found".format(name))
+
+
+def _digest_from_download_resource_name(resource_name):
+ parts = resource_name.split('/')
+
+ # Accept requests from non-conforming BuildStream 1.1.x clients
+ if len(parts) == 2:
+ parts.insert(0, 'blobs')
+
+ if len(parts) != 3 or parts[0] != 'blobs':
+ return None
+
+ try:
+ digest = remote_execution_pb2.Digest()
+ digest.hash = parts[1]
+ digest.size_bytes = int(parts[2])
+ return digest
+ except ValueError:
+ return None
+
+
+def _digest_from_upload_resource_name(resource_name):
+ parts = resource_name.split('/')
+
+ # Accept requests from non-conforming BuildStream 1.1.x clients
+ if len(parts) == 2:
+ parts.insert(0, 'uploads')
+ parts.insert(1, str(uuid.uuid4()))
+ parts.insert(2, 'blobs')
+
+ if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
+ return None
+
+ try:
+ uuid_ = uuid.UUID(hex=parts[1])
+ if uuid_.version != 4:
+ return None
+
+ digest = remote_execution_pb2.Digest()
+ digest.hash = parts[3]
+ digest.size_bytes = int(parts[4])
+ return digest
+ except ValueError:
+ return None
+
+
+class _CacheCleaner:
+
+ __cleanup_cache_lock = threading.Lock()
+
+ def __init__(self, cas, max_head_size, min_head_size=int(2e9)):
+ self.__cas = cas
+ self.__max_head_size = max_head_size
+ self.__min_head_size = min_head_size
+
+ def __has_space(self, object_size):
+ stats = os.statvfs(self.__cas.casdir)
+ free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size
+ total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size
+
+ if object_size > total_disk_space:
+ raise ArtifactTooLargeException("Artifact of size: {} is too large for "
+ "the filesystem which mounts the remote "
+ "cache".format(object_size))
+
+ return object_size <= free_disk_space
+
+ # _clean_up_cache()
+ #
+ # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there
+ # is enough space for the incoming artifact
+ #
+ # Args:
+ # object_size: The size of the object being received in bytes
+ #
+ # Returns:
+ # int: The total bytes removed on the filesystem
+ #
+ def clean_up(self, object_size):
+ if self.__has_space(object_size):
+ return 0
+
+ with _CacheCleaner.__cleanup_cache_lock:
+ if self.__has_space(object_size):
+ # Another thread has done the cleanup for us
+ return 0
+
+ stats = os.statvfs(self.__cas.casdir)
+ target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size
+
+ # obtain a list of LRP artifacts
+ LRP_objects = self.__cas.list_objects()
+
+ removed_size = 0 # in bytes
+ last_mtime = 0
+
+ while object_size - removed_size > target_disk_space:
+ try:
+ last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact
+ except IndexError:
+ # This exception is caught if there are no more artifacts in the list
+ # LRP_artifacts. This means the the artifact is too large for the filesystem
+ # so we abort the process
+ raise ArtifactTooLargeException("Artifact of size {} is too large for "
+ "the filesystem which mounts the remote "
+ "cache".format(object_size))
+
+ try:
+ size = os.stat(to_remove).st_size
+ os.unlink(to_remove)
+ removed_size += size
+ except FileNotFoundError:
+ pass
+
+ self.__cas.clean_up_refs_until(last_mtime)
+
+ if removed_size > 0:
+ logging.info("Successfully removed %d bytes from the cache", removed_size)
+ else:
+ logging.info("No artifacts were removed from the cache.")
+
+ return removed_size