summaryrefslogtreecommitdiff
path: root/buildstream/_artifactcache/cascache.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_artifactcache/cascache.py')
-rw-r--r--buildstream/_artifactcache/cascache.py78
1 files changed, 74 insertions, 4 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 936cb780b..3a3181bfb 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -1048,10 +1048,29 @@ class CASCache(ArtifactCache):
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
- for blob_digest in missing_blobs.values():
- with open(self.objpath(blob_digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
- self._send_blob(remote, blob_digest, f, u_uid=u_uid)
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
+
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
+ batch = _CASBatchUpdate(remote)
+
+ for digest in digests:
+ with open(self.objpath(digest), 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
+
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
+ not remote.batch_update_supported):
+ # Too large for batch request, upload in independent request.
+ self._send_blob(remote, digest, f, u_uid=u_uid)
+ else:
+ if not batch.add(digest, f):
+ # Not enough space left in batch request.
+ # Complete pending batch first.
+ batch.send()
+ batch = _CASBatchUpdate(remote)
+ batch.add(digest, f)
+
+ # Send final batch
+ batch.send()
# Represents a single remote CAS cache.
@@ -1126,6 +1145,17 @@ class _CASRemote():
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
+ # Check whether the server supports BatchUpdateBlobs()
+ self.batch_update_supported = False
+ try:
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ response = self.cas.BatchUpdateBlobs(request)
+ self.batch_update_supported = True
+ except grpc.RpcError as e:
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
+ raise
+
self._initialized = True
@@ -1173,6 +1203,46 @@ class _CASBatchRead():
yield (response.digest, response.data)
+# Represents a batch of blobs queued for upload.
+#
+class _CASBatchUpdate():
+ def __init__(self, remote):
+ self._remote = remote
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ self._size = 0
+ self._sent = False
+
+ def add(self, digest, stream):
+ assert not self._sent
+
+ new_batch_size = self._size + digest.size_bytes
+ if new_batch_size > self._max_total_size_bytes:
+ # Not enough space left in current batch
+ return False
+
+ blob_request = self._request.requests.add()
+ blob_request.digest.hash = digest.hash
+ blob_request.digest.size_bytes = digest.size_bytes
+ blob_request.data = stream.read(digest.size_bytes)
+ self._size = new_batch_size
+ return True
+
+ def send(self):
+ assert not self._sent
+ self._sent = True
+
+ if len(self._request.requests) == 0:
+ return
+
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+
+ for response in batch_response.responses:
+ if response.status.code != grpc.StatusCode.OK.value[0]:
+ raise ArtifactError("Failed to upload blob {}: {}".format(
+ response.digest.hash, response.status.code))
+
+
def _grouper(iterable, n):
while True:
try: