diff options
Diffstat (limited to 'buildstream/_artifactcache/casserver.py')
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 45 |
1 files changed, 43 insertions, 2 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index b51572755..31b05ce0f 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -68,7 +68,7 @@ def create_server(repo, *, enable_push): _ByteStreamServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache), server) + _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -222,9 +222,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas + self.enable_push = enable_push def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -260,6 +261,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres return response + def BatchUpdateBlobs(self, request, context): + response = remote_execution_pb2.BatchUpdateBlobsResponse() + + if not self.enable_push: + context.set_code(grpc.StatusCode.PERMISSION_DENIED) + return response + + batch_size = 0 + + for blob_request in request.requests: + digest = blob_request.digest + + batch_size += digest.size_bytes + if batch_size > _MAX_PAYLOAD_BYTES: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return response + + blob_response = response.responses.add() + blob_response.digest.hash = digest.hash + blob_response.digest.size_bytes = digest.size_bytes + + if len(blob_request.data) != digest.size_bytes: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + continue + + try: + _clean_up_cache(self.cas, digest.size_bytes) + + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + out.write(blob_request.data) + out.flush() + server_digest = self.cas.add_object(path=out.name) + if server_digest.hash != digest.hash: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + + except ArtifactTooLargeException: + blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED + + return response + class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): def GetCapabilities(self, request, context): |