diff options
author | Chandan Singh <csingh43@bloomberg.net> | 2019-04-24 22:53:19 +0100 |
---|---|---|
committer | Chandan Singh <csingh43@bloomberg.net> | 2019-05-21 12:41:18 +0100 |
commit | 070d053e5cc47e572e9f9e647315082bd7a15c63 (patch) | |
tree | 7fb0fdff52f9b5f8a18ec8fe9c75b661f9e0839e /src/buildstream/_cas/casserver.py | |
parent | 6c59e7901a52be961c2a1b671cf2b30f90bc4d0a (diff) | |
download | buildstream-070d053e5cc47e572e9f9e647315082bd7a15c63.tar.gz |
Move source from 'buildstream' to 'src/buildstream'
This was discussed in #1008.
Fixes #1009.
Diffstat (limited to 'src/buildstream/_cas/casserver.py')
-rw-r--r-- | src/buildstream/_cas/casserver.py | 619 |
1 files changed, 619 insertions, 0 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py new file mode 100644 index 000000000..c08a4d577 --- /dev/null +++ b/src/buildstream/_cas/casserver.py @@ -0,0 +1,619 @@ +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +from concurrent import futures +import logging +import os +import signal +import sys +import tempfile +import uuid +import errno +import threading + +import grpc +from google.protobuf.message import DecodeError +import click + +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc +from .._protos.google.rpc import code_pb2 +from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ + artifact_pb2, artifact_pb2_grpc + +from .._exceptions import CASError + +from .cascache import CASCache + + +# The default limit for gRPC messages is 4 MiB. +# Limit payload to 1 MiB to leave sufficient headroom for metadata. +_MAX_PAYLOAD_BYTES = 1024 * 1024 + + +# Trying to push an artifact that is too large +class ArtifactTooLargeException(Exception): + pass + + +# create_server(): +# +# Create gRPC CAS artifact server as specified in the Remote Execution API. +# +# Args: +# repo (str): Path to CAS repository +# enable_push (bool): Whether to allow blob uploads and artifact updates +# +def create_server(repo, *, enable_push, + max_head_size=int(10e9), + min_head_size=int(2e9)): + cas = CASCache(os.path.abspath(repo)) + artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') + + # Use max_workers default from Python 3.5+ + max_workers = (os.cpu_count() or 1) * 5 + server = grpc.server(futures.ThreadPoolExecutor(max_workers)) + + cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size) + + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( + _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server) + + remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( + _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server) + + remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( + _CapabilitiesServicer(), server) + + buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( + _ReferenceStorageServicer(cas, enable_push=enable_push), server) + + artifact_pb2_grpc.add_ArtifactServiceServicer_to_server( + _ArtifactServicer(cas, artifactdir), server) + + return server + + +@click.command(short_help="CAS Artifact Server") +@click.option('--port', '-p', type=click.INT, required=True, help="Port number") +@click.option('--server-key', help="Private server key for TLS (PEM-encoded)") +@click.option('--server-cert', help="Public server certificate for TLS (PEM-encoded)") +@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)") +@click.option('--enable-push', default=False, is_flag=True, + help="Allow clients to upload blobs and update artifact cache") +@click.option('--head-room-min', type=click.INT, + help="Disk head room minimum in bytes", + default=2e9) +@click.option('--head-room-max', type=click.INT, + help="Disk head room maximum in bytes", + default=10e9) +@click.argument('repo') +def server_main(repo, port, server_key, server_cert, client_certs, enable_push, + head_room_min, head_room_max): + server = create_server(repo, + max_head_size=head_room_max, + min_head_size=head_room_min, + enable_push=enable_push) + + use_tls = bool(server_key) + + if bool(server_cert) != use_tls: + click.echo("ERROR: --server-key and --server-cert are both required for TLS", err=True) + sys.exit(-1) + + if client_certs and not use_tls: + click.echo("ERROR: --client-certs can only be used with --server-key", err=True) + sys.exit(-1) + + if use_tls: + # Read public/private key pair + with open(server_key, 'rb') as f: + server_key_bytes = f.read() + with open(server_cert, 'rb') as f: + server_cert_bytes = f.read() + + if client_certs: + with open(client_certs, 'rb') as f: + client_certs_bytes = f.read() + else: + client_certs_bytes = None + + credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)], + root_certificates=client_certs_bytes, + require_client_auth=bool(client_certs)) + server.add_secure_port('[::]:{}'.format(port), credentials) + else: + server.add_insecure_port('[::]:{}'.format(port)) + + # Run artifact server + server.start() + try: + while True: + signal.pause() + except KeyboardInterrupt: + server.stop(0) + + +class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): + def __init__(self, cas, cache_cleaner, *, enable_push): + super().__init__() + self.cas = cas + self.enable_push = enable_push + self.cache_cleaner = cache_cleaner + + def Read(self, request, context): + resource_name = request.resource_name + client_digest = _digest_from_download_resource_name(resource_name) + if client_digest is None: + context.set_code(grpc.StatusCode.NOT_FOUND) + return + + if request.read_offset > client_digest.size_bytes: + context.set_code(grpc.StatusCode.OUT_OF_RANGE) + return + + try: + with open(self.cas.objpath(client_digest), 'rb') as f: + if os.fstat(f.fileno()).st_size != client_digest.size_bytes: + context.set_code(grpc.StatusCode.NOT_FOUND) + return + + if request.read_offset > 0: + f.seek(request.read_offset) + + remaining = client_digest.size_bytes - request.read_offset + while remaining > 0: + chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) + remaining -= chunk_size + + response = bytestream_pb2.ReadResponse() + # max. 64 kB chunks + response.data = f.read(chunk_size) + yield response + except FileNotFoundError: + context.set_code(grpc.StatusCode.NOT_FOUND) + + def Write(self, request_iterator, context): + response = bytestream_pb2.WriteResponse() + + if not self.enable_push: + context.set_code(grpc.StatusCode.PERMISSION_DENIED) + return response + + offset = 0 + finished = False + resource_name = None + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + for request in request_iterator: + if finished or request.write_offset != offset: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response + + if resource_name is None: + # First request + resource_name = request.resource_name + client_digest = _digest_from_upload_resource_name(resource_name) + if client_digest is None: + context.set_code(grpc.StatusCode.NOT_FOUND) + return response + + while True: + if client_digest.size_bytes == 0: + break + try: + self.cache_cleaner.clean_up(client_digest.size_bytes) + except ArtifactTooLargeException as e: + context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED) + context.set_details(str(e)) + return response + + try: + os.posix_fallocate(out.fileno(), 0, client_digest.size_bytes) + break + except OSError as e: + # Multiple upload can happen in the same time + if e.errno != errno.ENOSPC: + raise + + elif request.resource_name: + # If it is set on subsequent calls, it **must** match the value of the first request. + if request.resource_name != resource_name: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response + + if (offset + len(request.data)) > client_digest.size_bytes: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response + + out.write(request.data) + offset += len(request.data) + if request.finish_write: + if client_digest.size_bytes != offset: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response + out.flush() + digest = self.cas.add_object(path=out.name, link_directly=True) + if digest.hash != client_digest.hash: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + return response + finished = True + + assert finished + + response.committed_size = offset + return response + + +class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): + def __init__(self, cas, cache_cleaner, *, enable_push): + super().__init__() + self.cas = cas + self.enable_push = enable_push + self.cache_cleaner = cache_cleaner + + def FindMissingBlobs(self, request, context): + response = remote_execution_pb2.FindMissingBlobsResponse() + for digest in request.blob_digests: + objpath = self.cas.objpath(digest) + try: + os.utime(objpath) + except OSError as e: + if e.errno != errno.ENOENT: + raise + else: + d = response.missing_blob_digests.add() + d.hash = digest.hash + d.size_bytes = digest.size_bytes + + return response + + def BatchReadBlobs(self, request, context): + response = remote_execution_pb2.BatchReadBlobsResponse() + batch_size = 0 + + for digest in request.digests: + batch_size += digest.size_bytes + if batch_size > _MAX_PAYLOAD_BYTES: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return response + + blob_response = response.responses.add() + blob_response.digest.hash = digest.hash + blob_response.digest.size_bytes = digest.size_bytes + try: + with open(self.cas.objpath(digest), 'rb') as f: + if os.fstat(f.fileno()).st_size != digest.size_bytes: + blob_response.status.code = code_pb2.NOT_FOUND + continue + + blob_response.data = f.read(digest.size_bytes) + except FileNotFoundError: + blob_response.status.code = code_pb2.NOT_FOUND + + return response + + def BatchUpdateBlobs(self, request, context): + response = remote_execution_pb2.BatchUpdateBlobsResponse() + + if not self.enable_push: + context.set_code(grpc.StatusCode.PERMISSION_DENIED) + return response + + batch_size = 0 + + for blob_request in request.requests: + digest = blob_request.digest + + batch_size += digest.size_bytes + if batch_size > _MAX_PAYLOAD_BYTES: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return response + + blob_response = response.responses.add() + blob_response.digest.hash = digest.hash + blob_response.digest.size_bytes = digest.size_bytes + + if len(blob_request.data) != digest.size_bytes: + blob_response.status.code = code_pb2.FAILED_PRECONDITION + continue + + try: + self.cache_cleaner.clean_up(digest.size_bytes) + + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + out.write(blob_request.data) + out.flush() + server_digest = self.cas.add_object(path=out.name) + if server_digest.hash != digest.hash: + blob_response.status.code = code_pb2.FAILED_PRECONDITION + + except ArtifactTooLargeException: + blob_response.status.code = code_pb2.RESOURCE_EXHAUSTED + + return response + + +class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): + def GetCapabilities(self, request, context): + response = remote_execution_pb2.ServerCapabilities() + + cache_capabilities = response.cache_capabilities + cache_capabilities.digest_function.append(remote_execution_pb2.SHA256) + cache_capabilities.action_cache_update_capabilities.update_enabled = False + cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES + cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED + + response.deprecated_api_version.major = 2 + response.low_api_version.major = 2 + response.high_api_version.major = 2 + + return response + + +class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): + def __init__(self, cas, *, enable_push): + super().__init__() + self.cas = cas + self.enable_push = enable_push + + def GetReference(self, request, context): + response = buildstream_pb2.GetReferenceResponse() + + try: + tree = self.cas.resolve_ref(request.key, update_mtime=True) + try: + self.cas.update_tree_mtime(tree) + except FileNotFoundError: + self.cas.remove(request.key, defer_prune=True) + context.set_code(grpc.StatusCode.NOT_FOUND) + return response + + response.digest.hash = tree.hash + response.digest.size_bytes = tree.size_bytes + except CASError: + context.set_code(grpc.StatusCode.NOT_FOUND) + + return response + + def UpdateReference(self, request, context): + response = buildstream_pb2.UpdateReferenceResponse() + + if not self.enable_push: + context.set_code(grpc.StatusCode.PERMISSION_DENIED) + return response + + for key in request.keys: + self.cas.set_ref(key, request.digest) + + return response + + def Status(self, request, context): + response = buildstream_pb2.StatusResponse() + + response.allow_updates = self.enable_push + + return response + + +class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): + + def __init__(self, cas, artifactdir): + super().__init__() + self.cas = cas + self.artifactdir = artifactdir + os.makedirs(artifactdir, exist_ok=True) + + def GetArtifact(self, request, context): + artifact_path = os.path.join(self.artifactdir, request.cache_key) + if not os.path.exists(artifact_path): + context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found") + + artifact = artifact_pb2.Artifact() + with open(artifact_path, 'rb') as f: + artifact.ParseFromString(f.read()) + + # Now update mtimes of files present. + try: + + if str(artifact.files): + self.cas.update_tree_mtime(artifact.files) + + if str(artifact.buildtree): + # buildtrees might not be there + try: + self.cas.update_tree_mtime(artifact.buildtree) + except FileNotFoundError: + pass + + if str(artifact.public_data): + os.utime(self.cas.objpath(artifact.public_data)) + + for log_file in artifact.logs: + os.utime(self.cas.objpath(log_file.digest)) + + except FileNotFoundError: + os.unlink(artifact_path) + context.abort(grpc.StatusCode.NOT_FOUND, + "Artifact files incomplete") + except DecodeError: + context.abort(grpc.StatusCode.NOT_FOUND, + "Artifact files not valid") + + return artifact + + def UpdateArtifact(self, request, context): + artifact = request.artifact + + # Check that the files specified are in the CAS + self._check_directory("files", artifact.files, context) + + # Unset protocol buffers don't evaluated to False but do return empty + # strings, hence str() + if str(artifact.public_data): + self._check_file("public data", artifact.public_data, context) + + for log_file in artifact.logs: + self._check_file("log digest", log_file.digest, context) + + # Add the artifact proto to the cas + artifact_path = os.path.join(self.artifactdir, request.cache_key) + os.makedirs(os.path.dirname(artifact_path), exist_ok=True) + with open(artifact_path, 'wb') as f: + f.write(artifact.SerializeToString()) + + return artifact + + def _check_directory(self, name, digest, context): + try: + directory = remote_execution_pb2.Directory() + with open(self.cas.objpath(digest), 'rb') as f: + directory.ParseFromString(f.read()) + except FileNotFoundError: + context.abort(grpc.StatusCode.FAILED_PRECONDITION, + "Artifact {} specified but no files found".format(name)) + except DecodeError: + context.abort(grpc.StatusCode.FAILED_PRECONDITION, + "Artifact {} specified but directory not found".format(name)) + + def _check_file(self, name, digest, context): + if not os.path.exists(self.cas.objpath(digest)): + context.abort(grpc.StatusCode.FAILED_PRECONDITION, + "Artifact {} specified but not found".format(name)) + + +def _digest_from_download_resource_name(resource_name): + parts = resource_name.split('/') + + # Accept requests from non-conforming BuildStream 1.1.x clients + if len(parts) == 2: + parts.insert(0, 'blobs') + + if len(parts) != 3 or parts[0] != 'blobs': + return None + + try: + digest = remote_execution_pb2.Digest() + digest.hash = parts[1] + digest.size_bytes = int(parts[2]) + return digest + except ValueError: + return None + + +def _digest_from_upload_resource_name(resource_name): + parts = resource_name.split('/') + + # Accept requests from non-conforming BuildStream 1.1.x clients + if len(parts) == 2: + parts.insert(0, 'uploads') + parts.insert(1, str(uuid.uuid4())) + parts.insert(2, 'blobs') + + if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs': + return None + + try: + uuid_ = uuid.UUID(hex=parts[1]) + if uuid_.version != 4: + return None + + digest = remote_execution_pb2.Digest() + digest.hash = parts[3] + digest.size_bytes = int(parts[4]) + return digest + except ValueError: + return None + + +class _CacheCleaner: + + __cleanup_cache_lock = threading.Lock() + + def __init__(self, cas, max_head_size, min_head_size=int(2e9)): + self.__cas = cas + self.__max_head_size = max_head_size + self.__min_head_size = min_head_size + + def __has_space(self, object_size): + stats = os.statvfs(self.__cas.casdir) + free_disk_space = (stats.f_bavail * stats.f_bsize) - self.__min_head_size + total_disk_space = (stats.f_blocks * stats.f_bsize) - self.__min_head_size + + if object_size > total_disk_space: + raise ArtifactTooLargeException("Artifact of size: {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) + + return object_size <= free_disk_space + + # _clean_up_cache() + # + # Keep removing Least Recently Pushed (LRP) artifacts in a cache until there + # is enough space for the incoming artifact + # + # Args: + # object_size: The size of the object being received in bytes + # + # Returns: + # int: The total bytes removed on the filesystem + # + def clean_up(self, object_size): + if self.__has_space(object_size): + return 0 + + with _CacheCleaner.__cleanup_cache_lock: + if self.__has_space(object_size): + # Another thread has done the cleanup for us + return 0 + + stats = os.statvfs(self.__cas.casdir) + target_disk_space = (stats.f_bavail * stats.f_bsize) - self.__max_head_size + + # obtain a list of LRP artifacts + LRP_objects = self.__cas.list_objects() + + removed_size = 0 # in bytes + last_mtime = 0 + + while object_size - removed_size > target_disk_space: + try: + last_mtime, to_remove = LRP_objects.pop(0) # The first element in the list is the LRP artifact + except IndexError: + # This exception is caught if there are no more artifacts in the list + # LRP_artifacts. This means the the artifact is too large for the filesystem + # so we abort the process + raise ArtifactTooLargeException("Artifact of size {} is too large for " + "the filesystem which mounts the remote " + "cache".format(object_size)) + + try: + size = os.stat(to_remove).st_size + os.unlink(to_remove) + removed_size += size + except FileNotFoundError: + pass + + self.__cas.clean_up_refs_until(last_mtime) + + if removed_size > 0: + logging.info("Successfully removed %d bytes from the cache", removed_size) + else: + logging.info("No artifacts were removed from the cache.") + + return removed_size |