From ca3cce5e0601c8a5e3a10c94463499ec08d65557 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 10 Sep 2018 11:25:39 +0200 Subject: _artifactcache/casserver.py: Implement Capabilities service --- buildstream/_artifactcache/casserver.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 0af65729b..f89421d88 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -38,6 +38,10 @@ from .._context import Context from .cascache import CASCache +# The default limit for gRPC messages is 4 MiB +_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024 + + # Trying to push an artifact that is too large class ArtifactTooLargeException(Exception): pass @@ -67,6 +71,9 @@ def create_server(repo, *, enable_push): remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( _ContentAddressableStorageServicer(artifactcache), server) + remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( + _CapabilitiesServicer(), server) + buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server) @@ -230,6 +237,23 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres return response +class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): + def GetCapabilities(self, request, context): + response = remote_execution_pb2.ServerCapabilities() + + cache_capabilities = response.cache_capabilities + cache_capabilities.digest_function.append(remote_execution_pb2.SHA256) + cache_capabilities.action_cache_update_capabilities.update_enabled = False + cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES + cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED + + response.deprecated_api_version.major = 2 + response.low_api_version.major = 2 + response.high_api_version.major = 2 + + return response + + class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): def __init__(self, cas, *, enable_push): super().__init__() -- cgit v1.2.1 From 8f0bb875629767bb02968ae99af4fefaf4933d43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Thu, 6 Sep 2018 15:48:54 +0200 Subject: _artifactcache/casserver.py: Implement BatchReadBlobs Fixes #632. --- buildstream/_artifactcache/casserver.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index f89421d88..8c3ece27d 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -236,6 +236,31 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres d.size_bytes = digest.size_bytes return response + def BatchReadBlobs(self, request, context): + response = remote_execution_pb2.BatchReadBlobsResponse() + batch_size = 0 + + for digest in request.digests: + batch_size += digest.size_bytes + if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return response + + blob_response = response.responses.add() + blob_response.digest.hash = digest.hash + blob_response.digest.size_bytes = digest.size_bytes + try: + with open(self.cas.objpath(digest), 'rb') as f: + if os.fstat(f.fileno()).st_size != digest.size_bytes: + blob_response.status.code = grpc.StatusCode.NOT_FOUND + continue + + blob_response.data = f.read(digest.size_bytes) + except FileNotFoundError: + blob_response.status.code = grpc.StatusCode.NOT_FOUND + + return response + class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): def GetCapabilities(self, request, context): -- cgit v1.2.1