From d61e21448953942ce90d457dad7189c0dda61bc7 Mon Sep 17 00:00:00 2001 From: Raoul Hidalgo Charman Date: Fri, 21 Jun 2019 12:43:50 +0100 Subject: casserver.py: Add source service Part of #1038 --- src/buildstream/_cas/casserver.py | 40 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 774548216..11fee64c3 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -35,7 +35,7 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.google.rpc import code_pb2 from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ - artifact_pb2, artifact_pb2_grpc + artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc from .. import utils from .._exceptions import CASError @@ -66,6 +66,7 @@ def create_server(repo, *, enable_push, min_head_size=int(2e9)): cas = CASCache(os.path.abspath(repo)) artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') + sourcedir = os.path.join(os.path.abspath(repo), 'source_protos') # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 @@ -88,6 +89,9 @@ def create_server(repo, *, enable_push, artifact_pb2_grpc.add_ArtifactServiceServicer_to_server( _ArtifactServicer(cas, artifactdir), server) + source_pb2_grpc.add_SourceServiceServicer_to_server( + _SourceServicer(sourcedir), server) + # Create up reference storage and artifact capabilities artifact_capabilities = buildstream_pb2.ArtifactCapabilities( allow_updates=enable_push) @@ -518,6 +522,40 @@ class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer return response +class _SourceServicer(source_pb2_grpc.SourceServiceServicer): + def __init__(self, sourcedir): + self.sourcedir = sourcedir + + def GetSource(self, request, context): + try: + source_proto = self._get_source(request.cache_key) + except FileNotFoundError: + context.abort(grpc.StatusCode.NOT_FOUND, "Source not found") + except DecodeError: + context.abort(grpc.StatusCode.NOT_FOUND, + "Sources gives invalid directory") + + return source_proto + + def UpdateSource(self, request, context): + self._set_source(request.cache_key, request.source) + return request.source + + def _get_source(self, cache_key): + path = os.path.join(self.sourcedir, cache_key) + source_proto = source_pb2.Source() + with open(path, 'r+b') as f: + source_proto.ParseFromString(f.read()) + os.utime(path) + return source_proto + + def _set_source(self, cache_key, source_proto): + path = os.path.join(self.sourcedir, cache_key) + os.makedirs(os.path.dirname(path), exist_ok=True) + with utils.save_file_atomic(path, 'w+b') as f: + f.write(source_proto.SerializeToString()) + + def _digest_from_download_resource_name(resource_name): parts = resource_name.split('/') -- cgit v1.2.1