diff options
author | Jürg Billeter <j@bitron.ch> | 2018-05-11 17:14:33 +0200 |
---|---|---|
committer | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-05-25 13:10:24 +0100 |
commit | d61cb7bacc8c03f8326e82ac5f99ef93f570c85b (patch) | |
tree | 7bd87ef559faff3dfe2642fc27bbfd4f0a7d2de6 | |
parent | 33f1c2b355078ffa613e9fd1cb2bd95d789b6c78 (diff) | |
download | buildstream-d61cb7bacc8c03f8326e82ac5f99ef93f570c85b.tar.gz |
_artifactcache: Add GoogleCAS server
-rw-r--r-- | buildstream/_artifactcache/googlecas.py | 36 | ||||
-rw-r--r-- | buildstream/_artifactcache/googlecasserver.py | 173 | ||||
-rwxr-xr-x | setup.py | 3 |
3 files changed, 210 insertions, 2 deletions
diff --git a/buildstream/_artifactcache/googlecas.py b/buildstream/_artifactcache/googlecas.py index 68fd51dc8..608c75249 100644 --- a/buildstream/_artifactcache/googlecas.py +++ b/buildstream/_artifactcache/googlecas.py @@ -18,6 +18,7 @@ # Authors: # Jürg Billeter <juerg.billeter@codethink.co.uk> +from contextlib import contextmanager import hashlib import os import stat @@ -413,7 +414,7 @@ class GoogleCASCache(ArtifactCache): def _set_ref(self, ref, tree): refpath = self.__refpath(ref) os.makedirs(os.path.dirname(refpath), exist_ok=True) - with utils.save_file_atomic(refpath, 'wb') as f: + with save_file_atomic(refpath, 'wb') as f: f.write(tree.SerializeToString()) def __refpath(self, ref): @@ -577,3 +578,36 @@ class GoogleCASCache(ArtifactCache): # all referenced blobs to avoid dangling references in the repository digest = self._hash_object(path=out.name) assert digest.hash == tree.hash + + +# not using utils version (with _signals) as that doesn't work in threads +@contextmanager +def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None, + errors=None, newline=None, closefd=True, opener=None): + assert os.path.isabs(filename), "The save_file_atomic() parameter ``filename`` must be an absolute path" + dirname = os.path.dirname(filename) + fd, tempname = tempfile.mkstemp(dir=dirname) + os.close(fd) + + f = open(tempname, mode=mode, buffering=buffering, encoding=encoding, + errors=errors, newline=newline, closefd=closefd, opener=opener) + + def cleanup_tempfile(): + f.close() + try: + os.remove(tempname) + except FileNotFoundError: + pass + except OSError as e: + raise utils.UtilError("Failed to cleanup temporary file {}: {}".format(tempname, e)) from e + + try: + f.real_filename = filename + yield f + f.close() + # This operation is atomic, at least on platforms we care about: + # https://bugs.python.org/issue8828 + os.replace(tempname, filename) + except Exception: + cleanup_tempfile() + raise diff --git a/buildstream/_artifactcache/googlecasserver.py b/buildstream/_artifactcache/googlecasserver.py new file mode 100644 index 000000000..17dced458 --- /dev/null +++ b/buildstream/_artifactcache/googlecasserver.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +from concurrent import futures +import os +import tempfile +import time + +import click +import grpc + +from google.bytestream import bytestream_pb2, bytestream_pb2_grpc +from google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc +from buildstream import buildstream_pb2, buildstream_pb2_grpc + +from .._exceptions import ArtifactError +from .._context import Context + +from .googlecas import GoogleCASCache + + +def create_server(repo): + def _digest_from_resource_name(resource_name): + parts = resource_name.split('/') + assert len(parts) == 2 + digest = remote_execution_pb2.Digest() + digest.hash = parts[0] + digest.size_bytes = int(parts[1]) + return digest + + class ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): + def __init__(self, cas): + super().__init__() + self.cas = cas + + def Read(self, request, context): + resource_name = request.resource_name + client_digest = _digest_from_resource_name(resource_name) + assert request.read_offset <= client_digest.size_bytes + + with open(self.cas._objpath(client_digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == client_digest.size_bytes + if request.read_offset > 0: + f.seek(request.read_offset) + + remaining = client_digest.size_bytes - request.read_offset + while remaining > 0: + chunk_size = min(remaining, 64 * 1024) + remaining -= chunk_size + + response = bytestream_pb2.ReadResponse() + # max. 64 kB chunks + response.data = f.read(chunk_size) + yield response + + def Write(self, request_iterator, context): + offset = 0 + finished = False + resource_name = None + with tempfile.NamedTemporaryFile(dir=os.path.join(self.cas.casdir, 'tmp')) as out: + for request in request_iterator: + assert not finished + assert request.write_offset == offset + if resource_name is None: + # First request + resource_name = request.resource_name + client_digest = _digest_from_resource_name(resource_name) + elif request.resource_name: + # If it is set on subsequent calls, it **must** match the value of the first request. + assert request.resource_name == resource_name + out.write(request.data) + offset += len(request.data) + if request.finish_write: + assert client_digest.size_bytes == offset + out.flush() + digest = self.cas._hash_object(path=out.name) + assert digest.hash == client_digest.hash + finished = True + + assert finished + + response = bytestream_pb2.WriteResponse() + response.committed_size = offset + return response + + class ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): + def __init__(self, cas): + super().__init__() + self.cas = cas + + def FindMissingBlobs(self, request, context): + response = remote_execution_pb2.FindMissingBlobsResponse() + for digest in request.blob_digests: + if not self.cas._has_object(digest): + d = response.missing_blob_digests.add() + d.hash = digest.hash + d.size_bytes = digest.size_bytes + return response + + class ArtifactCacheServicer(buildstream_pb2_grpc.ArtifactCacheServicer): + def __init__(self, cas): + super().__init__() + self.cas = cas + + def GetArtifact(self, request, context): + response = buildstream_pb2.GetArtifactResponse() + + try: + tree = self.cas._resolve_ref(request.key) + + response.artifact.hash = tree.hash + response.artifact.size_bytes = tree.size_bytes + except ArtifactError: + context.set_code(grpc.StatusCode.NOT_FOUND) + + return response + + def UpdateArtifact(self, request, context): + for key in request.keys: + self.cas._set_ref(key, request.artifact) + + response = buildstream_pb2.UpdateArtifactResponse() + return response + + context = Context() + context.artifactdir = repo + + artifactcache = GoogleCASCache(context) + + server = grpc.server(futures.ThreadPoolExecutor()) + + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( + ByteStreamServicer(artifactcache), server) + + remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( + ContentAddressableStorageServicer(artifactcache), server) + + buildstream_pb2_grpc.add_ArtifactCacheServicer_to_server( + ArtifactCacheServicer(artifactcache), server) + + return server + + +@click.command(short_help="Google CAS Server") +@click.option('--port', '-p', type=click.INT, required=True, help="Port number") +@click.argument('repo') +def server_main(port, repo): + server = create_server(repo) + + server.add_insecure_port('localhost:{}'.format(port)) + server.start() + try: + while True: + time.sleep(60 * 60 * 24) + except KeyboardInterrupt: + server.stop(0) @@ -154,7 +154,8 @@ def list_man_pages(): # So screw it, lets just use an env var. bst_install_entry_points = { 'console_scripts': [ - 'bst-artifact-receive = buildstream._artifactcache.pushreceive:receive_main' + 'bst-artifact-receive = buildstream._artifactcache.pushreceive:receive_main', + 'bst-googlecas-server = buildstream._artifactcache.googlecasserver:server_main' ], } |