diff options
Diffstat (limited to 'buildstream/_artifactcache/cascache.py')
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 78 |
1 files changed, 74 insertions, 4 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 936cb780b..3a3181bfb 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache): missing_blobs[d.hash] = d # Upload any blobs missing on the server - for blob_digest in missing_blobs.values(): - with open(self.objpath(blob_digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes - self._send_blob(remote, blob_digest, f, u_uid=u_uid) + self._send_blobs(remote, missing_blobs.values(), u_uid) + + def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()): + batch = _CASBatchUpdate(remote) + + for digest in digests: + with open(self.objpath(digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == digest.size_bytes + + if (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_update_supported): + # Too large for batch request, upload in independent request. + self._send_blob(remote, digest, f, u_uid=u_uid) + else: + if not batch.add(digest, f): + # Not enough space left in batch request. + # Complete pending batch first. + batch.send() + batch = _CASBatchUpdate(remote) + batch.add(digest, f) + + # Send final batch + batch.send() # Represents a single remote CAS cache. @@ -1126,6 +1145,17 @@ class _CASRemote(): if e.code() != grpc.StatusCode.UNIMPLEMENTED: raise + # Check whether the server supports BatchUpdateBlobs() + self.batch_update_supported = False + try: + request = remote_execution_pb2.BatchUpdateBlobsRequest() + response = self.cas.BatchUpdateBlobs(request) + self.batch_update_supported = True + except grpc.RpcError as e: + if (e.code() != grpc.StatusCode.UNIMPLEMENTED and + e.code() != grpc.StatusCode.PERMISSION_DENIED): + raise + self._initialized = True @@ -1173,6 +1203,46 @@ class _CASBatchRead(): yield (response.digest, response.data) +# Represents a batch of blobs queued for upload. +# +class _CASBatchUpdate(): + def __init__(self, remote): + self._remote = remote + self._max_total_size_bytes = remote.max_batch_total_size_bytes + self._request = remote_execution_pb2.BatchUpdateBlobsRequest() + self._size = 0 + self._sent = False + + def add(self, digest, stream): + assert not self._sent + + new_batch_size = self._size + digest.size_bytes + if new_batch_size > self._max_total_size_bytes: + # Not enough space left in current batch + return False + + blob_request = self._request.requests.add() + blob_request.digest.hash = digest.hash + blob_request.digest.size_bytes = digest.size_bytes + blob_request.data = stream.read(digest.size_bytes) + self._size = new_batch_size + return True + + def send(self): + assert not self._sent + self._sent = True + + if len(self._request.requests) == 0: + return + + batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + + for response in batch_response.responses: + if response.status.code != grpc.StatusCode.OK.value[0]: + raise ArtifactError("Failed to upload blob {}: {}".format( + response.digest.hash, response.status.code)) + + def _grouper(iterable, n): while True: try: |