summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-05-11 17:14:33 +0200
committerJim MacArthur <jim.macarthur@codethink.co.uk>2018-05-25 13:10:24 +0100
commitd61cb7bacc8c03f8326e82ac5f99ef93f570c85b (patch)
tree7bd87ef559faff3dfe2642fc27bbfd4f0a7d2de6
parent33f1c2b355078ffa613e9fd1cb2bd95d789b6c78 (diff)
downloadbuildstream-d61cb7bacc8c03f8326e82ac5f99ef93f570c85b.tar.gz
_artifactcache: Add GoogleCAS server
-rw-r--r--buildstream/_artifactcache/googlecas.py36
-rw-r--r--buildstream/_artifactcache/googlecasserver.py173
-rwxr-xr-xsetup.py3
3 files changed, 210 insertions, 2 deletions
diff --git a/buildstream/_artifactcache/googlecas.py b/buildstream/_artifactcache/googlecas.py
index 68fd51dc8..608c75249 100644
--- a/buildstream/_artifactcache/googlecas.py
+++ b/buildstream/_artifactcache/googlecas.py
@@ -18,6 +18,7 @@
# Authors:
# Jürg Billeter <juerg.billeter@codethink.co.uk>
+from contextlib import contextmanager
import hashlib
import os
import stat
@@ -413,7 +414,7 @@ class GoogleCASCache(ArtifactCache):
def _set_ref(self, ref, tree):
refpath = self.__refpath(ref)
os.makedirs(os.path.dirname(refpath), exist_ok=True)
- with utils.save_file_atomic(refpath, 'wb') as f:
+ with save_file_atomic(refpath, 'wb') as f:
f.write(tree.SerializeToString())
def __refpath(self, ref):
@@ -577,3 +578,36 @@ class GoogleCASCache(ArtifactCache):
# all referenced blobs to avoid dangling references in the repository
digest = self._hash_object(path=out.name)
assert digest.hash == tree.hash
+
+
+# not using utils version (with _signals) as that doesn't work in threads
+@contextmanager
+def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
+ errors=None, newline=None, closefd=True, opener=None):
+ assert os.path.isabs(filename), "The save_file_atomic() parameter ``filename`` must be an absolute path"
+ dirname = os.path.dirname(filename)
+ fd, tempname = tempfile.mkstemp(dir=dirname)
+ os.close(fd)
+
+ f = open(tempname, mode=mode, buffering=buffering, encoding=encoding,
+ errors=errors, newline=newline, closefd=closefd, opener=opener)
+
+ def cleanup_tempfile():
+ f.close()
+ try:
+ os.remove(tempname)
+ except FileNotFoundError:
+ pass
+ except OSError as e:
+ raise utils.UtilError("Failed to cleanup temporary file {}: {}".format(tempname, e)) from e
+
+ try:
+ f.real_filename = filename
+ yield f
+ f.close()
+ # This operation is atomic, at least on platforms we care about:
+ # https://bugs.python.org/issue8828
+ os.replace(tempname, filename)
+ except Exception:
+ cleanup_tempfile()
+ raise
diff --git a/buildstream/_artifactcache/googlecasserver.py b/buildstream/_artifactcache/googlecasserver.py
new file mode 100644
index 000000000..17dced458
--- /dev/null
+++ b/buildstream/_artifactcache/googlecasserver.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+from concurrent import futures
+import os
+import tempfile
+import time
+
+import click
+import grpc
+
+from google.bytestream import bytestream_pb2, bytestream_pb2_grpc
+from google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc
+from buildstream import buildstream_pb2, buildstream_pb2_grpc
+
+from .._exceptions import ArtifactError
+from .._context import Context
+
+from .googlecas import GoogleCASCache
+
+
+def create_server(repo):
+ def _digest_from_resource_name(resource_name):
+ parts = resource_name.split('/')
+ assert len(parts) == 2
+ digest = remote_execution_pb2.Digest()
+ digest.hash = parts[0]
+ digest.size_bytes = int(parts[1])
+ return digest
+
+ class ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
+ def __init__(self, cas):
+ super().__init__()
+ self.cas = cas
+
+ def Read(self, request, context):
+ resource_name = request.resource_name
+ client_digest = _digest_from_resource_name(resource_name)
+ assert request.read_offset <= client_digest.size_bytes
+
+ with open(self.cas._objpath(client_digest), 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
+ if request.read_offset > 0:
+ f.seek(request.read_offset)
+
+ remaining = client_digest.size_bytes - request.read_offset
+ while remaining > 0:
+ chunk_size = min(remaining, 64 * 1024)
+ remaining -= chunk_size
+
+ response = bytestream_pb2.ReadResponse()
+ # max. 64 kB chunks
+ response.data = f.read(chunk_size)
+ yield response
+
+ def Write(self, request_iterator, context):
+ offset = 0
+ finished = False
+ resource_name = None
+ with tempfile.NamedTemporaryFile(dir=os.path.join(self.cas.casdir, 'tmp')) as out:
+ for request in request_iterator:
+ assert not finished
+ assert request.write_offset == offset
+ if resource_name is None:
+ # First request
+ resource_name = request.resource_name
+ client_digest = _digest_from_resource_name(resource_name)
+ elif request.resource_name:
+ # If it is set on subsequent calls, it **must** match the value of the first request.
+ assert request.resource_name == resource_name
+ out.write(request.data)
+ offset += len(request.data)
+ if request.finish_write:
+ assert client_digest.size_bytes == offset
+ out.flush()
+ digest = self.cas._hash_object(path=out.name)
+ assert digest.hash == client_digest.hash
+ finished = True
+
+ assert finished
+
+ response = bytestream_pb2.WriteResponse()
+ response.committed_size = offset
+ return response
+
+ class ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
+ def __init__(self, cas):
+ super().__init__()
+ self.cas = cas
+
+ def FindMissingBlobs(self, request, context):
+ response = remote_execution_pb2.FindMissingBlobsResponse()
+ for digest in request.blob_digests:
+ if not self.cas._has_object(digest):
+ d = response.missing_blob_digests.add()
+ d.hash = digest.hash
+ d.size_bytes = digest.size_bytes
+ return response
+
+ class ArtifactCacheServicer(buildstream_pb2_grpc.ArtifactCacheServicer):
+ def __init__(self, cas):
+ super().__init__()
+ self.cas = cas
+
+ def GetArtifact(self, request, context):
+ response = buildstream_pb2.GetArtifactResponse()
+
+ try:
+ tree = self.cas._resolve_ref(request.key)
+
+ response.artifact.hash = tree.hash
+ response.artifact.size_bytes = tree.size_bytes
+ except ArtifactError:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+
+ return response
+
+ def UpdateArtifact(self, request, context):
+ for key in request.keys:
+ self.cas._set_ref(key, request.artifact)
+
+ response = buildstream_pb2.UpdateArtifactResponse()
+ return response
+
+ context = Context()
+ context.artifactdir = repo
+
+ artifactcache = GoogleCASCache(context)
+
+ server = grpc.server(futures.ThreadPoolExecutor())
+
+ bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
+ ByteStreamServicer(artifactcache), server)
+
+ remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
+ ContentAddressableStorageServicer(artifactcache), server)
+
+ buildstream_pb2_grpc.add_ArtifactCacheServicer_to_server(
+ ArtifactCacheServicer(artifactcache), server)
+
+ return server
+
+
+@click.command(short_help="Google CAS Server")
+@click.option('--port', '-p', type=click.INT, required=True, help="Port number")
+@click.argument('repo')
+def server_main(port, repo):
+ server = create_server(repo)
+
+ server.add_insecure_port('localhost:{}'.format(port))
+ server.start()
+ try:
+ while True:
+ time.sleep(60 * 60 * 24)
+ except KeyboardInterrupt:
+ server.stop(0)
diff --git a/setup.py b/setup.py
index fd9a22f4e..b3748d3fe 100755
--- a/setup.py
+++ b/setup.py
@@ -154,7 +154,8 @@ def list_man_pages():
# So screw it, lets just use an env var.
bst_install_entry_points = {
'console_scripts': [
- 'bst-artifact-receive = buildstream._artifactcache.pushreceive:receive_main'
+ 'bst-artifact-receive = buildstream._artifactcache.pushreceive:receive_main',
+ 'bst-googlecas-server = buildstream._artifactcache.googlecasserver:server_main'
],
}