diff options
author | Jürg Billeter <j@bitron.ch> | 2019-09-10 08:03:07 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2019-09-10 08:12:05 +0200 |
commit | 0fac0b4416d1becb7c5bddf076c02773d1ce05c4 (patch) | |
tree | bb5135d305f15a914e5def80a12fd0ce6a9422bb | |
parent | d7dca0d13f4ce6e3f3922e47a50df8cbd6298496 (diff) | |
download | buildstream-0fac0b4416d1becb7c5bddf076c02773d1ce05c4.tar.gz |
casremote.py: Limit request size for batch download and upload
Fixes #1129.
-rw-r--r-- | src/buildstream/_cas/casremote.py | 75 |
1 files changed, 47 insertions, 28 deletions
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 1efed22e6..43e215c63 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -12,6 +12,11 @@ from .._exceptions import CASRemoteError # Limit payload to 1 MiB to leave sufficient headroom for metadata. _MAX_PAYLOAD_BYTES = 1024 * 1024 +# How many digests to put in a single gRPC message. +# A 256-bit hash requires 64 bytes of space (hexadecimal encoding). +# 80 bytes provide sufficient space for hash, size, and protobuf overhead. +_MAX_DIGESTS = _MAX_PAYLOAD_BYTES / 80 + class BlobNotFound(CASRemoteError): @@ -157,13 +162,18 @@ class CASRemote(BaseRemote): class _CASBatchRead(): def __init__(self, remote): self._remote = remote - self._request = local_cas_pb2.FetchMissingBlobsRequest() - self._request.instance_name = remote.local_cas_instance_name + self._requests = [] + self._request = None self._sent = False def add(self, digest): assert not self._sent + if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS: + self._request = local_cas_pb2.FetchMissingBlobsRequest() + self._request.instance_name = self._remote.local_cas_instance_name + self._requests.append(self._request) + request_digest = self._request.blob_digests.add() request_digest.CopyFrom(digest) @@ -171,26 +181,28 @@ class _CASBatchRead(): assert not self._sent self._sent = True - if not self._request.blob_digests: + if not self._requests: return local_cas = self._remote.cascache._get_local_cas() - batch_response = local_cas.FetchMissingBlobs(self._request) - for response in batch_response.responses: - if response.status.code == code_pb2.NOT_FOUND: - if missing_blobs is None: - raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( - response.digest.hash, response.status.code)) + for request in self._requests: + batch_response = local_cas.FetchMissingBlobs(request) - missing_blobs.append(response.digest) + for response in batch_response.responses: + if response.status.code == code_pb2.NOT_FOUND: + if missing_blobs is None: + raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) - if response.status.code != code_pb2.OK: - raise CASRemoteError("Failed to download blob {}: {}".format( - response.digest.hash, response.status.code)) - if response.digest.size_bytes != len(response.data): - raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format( - response.digest.hash, response.digest.size_bytes, len(response.data))) + missing_blobs.append(response.digest) + + if response.status.code != code_pb2.OK: + raise CASRemoteError("Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) + if response.digest.size_bytes != len(response.data): + raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + response.digest.hash, response.digest.size_bytes, len(response.data))) # Represents a batch of blobs queued for upload. @@ -198,13 +210,18 @@ class _CASBatchRead(): class _CASBatchUpdate(): def __init__(self, remote): self._remote = remote - self._request = local_cas_pb2.UploadMissingBlobsRequest() - self._request.instance_name = remote.local_cas_instance_name + self._requests = [] + self._request = None self._sent = False def add(self, digest): assert not self._sent + if not self._request or len(self._request.blob_digests) >= _MAX_DIGESTS: + self._request = local_cas_pb2.UploadMissingBlobsRequest() + self._request.instance_name = self._remote.local_cas_instance_name + self._requests.append(self._request) + request_digest = self._request.blob_digests.add() request_digest.CopyFrom(digest) @@ -212,18 +229,20 @@ class _CASBatchUpdate(): assert not self._sent self._sent = True - if not self._request.blob_digests: + if not self._requests: return local_cas = self._remote.cascache._get_local_cas() - batch_response = local_cas.UploadMissingBlobs(self._request) - for response in batch_response.responses: - if response.status.code != code_pb2.OK: - if response.status.code == code_pb2.RESOURCE_EXHAUSTED: - reason = "cache-too-full" - else: - reason = None + for request in self._requests: + batch_response = local_cas.UploadMissingBlobs(request) + + for response in batch_response.responses: + if response.status.code != code_pb2.OK: + if response.status.code == code_pb2.RESOURCE_EXHAUSTED: + reason = "cache-too-full" + else: + reason = None - raise CASRemoteError("Failed to upload blob {}: {}".format( - response.digest.hash, response.status.code), reason=reason) + raise CASRemoteError("Failed to upload blob {}: {}".format( + response.digest.hash, response.status.code), reason=reason) |