summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-09-30 17:32:20 +0200
committerJürg Billeter <j@bitron.ch>2018-10-01 14:58:06 +0000
commit26e1a3c7ee221745d32bc3c13a0112dbc319e3de (patch)
treec28febbf560e951213eee3d089473cc543560c68
parentfafa81367d505cebe564d8f2dbf6cbee5bfcc118 (diff)
downloadbuildstream-26e1a3c7ee221745d32bc3c13a0112dbc319e3de.tar.gz
_artifactcache/casserver.py: Implement BatchUpdateBlobs
Fixes #676.
-rw-r--r--buildstream/_artifactcache/casserver.py45
1 files changed, 43 insertions, 2 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index b51572755..31b05ce0f 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -68,7 +68,7 @@ def create_server(repo, *, enable_push):
_ByteStreamServicer(artifactcache, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(artifactcache), server)
+ _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server)
remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
_CapabilitiesServicer(), server)
@@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
- def __init__(self, cas):
+ def __init__(self, cas, *, enable_push):
super().__init__()
self.cas = cas
+ self.enable_push = enable_push
def FindMissingBlobs(self, request, context):
response = remote_execution_pb2.FindMissingBlobsResponse()
@@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
return response
+ def BatchUpdateBlobs(self, request, context):
+ response = remote_execution_pb2.BatchUpdateBlobsResponse()
+
+ if not self.enable_push:
+ context.set_code(grpc.StatusCode.PERMISSION_DENIED)
+ return response
+
+ batch_size = 0
+
+ for blob_request in request.requests:
+ digest = blob_request.digest
+
+ batch_size += digest.size_bytes
+ if batch_size > _MAX_PAYLOAD_BYTES:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ return response
+
+ blob_response = response.responses.add()
+ blob_response.digest.hash = digest.hash
+ blob_response.digest.size_bytes = digest.size_bytes
+
+ if len(blob_request.data) != digest.size_bytes:
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
+ continue
+
+ try:
+ _clean_up_cache(self.cas, digest.size_bytes)
+
+ with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
+ out.write(blob_request.data)
+ out.flush()
+ server_digest = self.cas.add_object(path=out.name)
+ if server_digest.hash != digest.hash:
+ blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION
+
+ except ArtifactTooLargeException:
+ blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED
+
+ return response
+
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
def GetCapabilities(self, request, context):