summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2019-06-21 12:43:50 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-08 10:40:28 +0000
commitd61e21448953942ce90d457dad7189c0dda61bc7 (patch)
treee0a44453d5839857bc453eeb71404c62d58b6008
parent6b6e04ddb1c03f683e3e5591f057207d1303e6b6 (diff)
downloadbuildstream-d61e21448953942ce90d457dad7189c0dda61bc7.tar.gz
casserver.py: Add source service
Part of #1038
-rw-r--r--src/buildstream/_cas/casserver.py40
1 files changed, 39 insertions, 1 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 774548216..11fee64c3 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -35,7 +35,7 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
- artifact_pb2, artifact_pb2_grpc
+ artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
from .. import utils
from .._exceptions import CASError
@@ -66,6 +66,7 @@ def create_server(repo, *, enable_push,
min_head_size=int(2e9)):
cas = CASCache(os.path.abspath(repo))
artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
+ sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
@@ -88,6 +89,9 @@ def create_server(repo, *, enable_push,
artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
_ArtifactServicer(cas, artifactdir), server)
+ source_pb2_grpc.add_SourceServiceServicer_to_server(
+ _SourceServicer(sourcedir), server)
+
# Create up reference storage and artifact capabilities
artifact_capabilities = buildstream_pb2.ArtifactCapabilities(
allow_updates=enable_push)
@@ -518,6 +522,40 @@ class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer
return response
+class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
+ def __init__(self, sourcedir):
+ self.sourcedir = sourcedir
+
+ def GetSource(self, request, context):
+ try:
+ source_proto = self._get_source(request.cache_key)
+ except FileNotFoundError:
+ context.abort(grpc.StatusCode.NOT_FOUND, "Source not found")
+ except DecodeError:
+ context.abort(grpc.StatusCode.NOT_FOUND,
+ "Sources gives invalid directory")
+
+ return source_proto
+
+ def UpdateSource(self, request, context):
+ self._set_source(request.cache_key, request.source)
+ return request.source
+
+ def _get_source(self, cache_key):
+ path = os.path.join(self.sourcedir, cache_key)
+ source_proto = source_pb2.Source()
+ with open(path, 'r+b') as f:
+ source_proto.ParseFromString(f.read())
+ os.utime(path)
+ return source_proto
+
+ def _set_source(self, cache_key, source_proto):
+ path = os.path.join(self.sourcedir, cache_key)
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with utils.save_file_atomic(path, 'w+b') as f:
+ f.write(source_proto.SerializeToString())
+
+
def _digest_from_download_resource_name(resource_name):
parts = resource_name.split('/')