diff options
author | Jürg Billeter <j@bitron.ch> | 2018-09-30 17:32:20 +0200 |
---|---|---|
committer | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2018-10-03 07:35:51 +0000 |
commit | 4a67e4e37a734e5dda5f2613d0e41eb091e05533 (patch) | |
tree | e9e173d16c637d570602434e08a9eb72586df530 /buildstream | |
parent | 9568824f3780be032b500694bd2c78d6dd526fa4 (diff) | |
download | buildstream-4a67e4e37a734e5dda5f2613d0e41eb091e05533.tar.gz |
_artifactcache/casserver.py: Implement BatchUpdateBlobs
Fixes #676.
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 45 |
1 files changed, 43 insertions, 2 deletions
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index d833878d5..62d06f3ce 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -70,7 +70,7 @@ def create_server(repo, *, enable_push): _ByteStreamServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache), server) + _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) @@ -224,9 +224,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddressableStorageServicer): - def __init__(self, cas): + def __init__(self, cas, *, enable_push): super().__init__() self.cas = cas + self.enable_push = enable_push def FindMissingBlobs(self, request, context): response = remote_execution_pb2.FindMissingBlobsResponse() @@ -262,6 +263,46 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres return response + def BatchUpdateBlobs(self, request, context): + response = remote_execution_pb2.BatchUpdateBlobsResponse() + + if not self.enable_push: + context.set_code(grpc.StatusCode.PERMISSION_DENIED) + return response + + batch_size = 0 + + for blob_request in request.requests: + digest = blob_request.digest + + batch_size += digest.size_bytes + if batch_size > _MAX_PAYLOAD_BYTES: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return response + + blob_response = response.responses.add() + blob_response.digest.hash = digest.hash + blob_response.digest.size_bytes = digest.size_bytes + + if len(blob_request.data) != digest.size_bytes: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + continue + + try: + _clean_up_cache(self.cas, digest.size_bytes) + + with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out: + out.write(blob_request.data) + out.flush() + server_digest = self.cas.add_object(path=out.name) + if server_digest.hash != digest.hash: + blob_response.status.code = grpc.StatusCode.FAILED_PRECONDITION + + except ArtifactTooLargeException: + blob_response.status.code = grpc.StatusCode.RESOURCE_EXHAUSTED + + return response + class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): def GetCapabilities(self, request, context): |