summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-09-30 17:50:32 +0200
committerTristan Van Berkom <tristan.van.berkom@gmail.com>2018-10-03 07:35:51 +0000
commitf585b23314535af51fbd1eb80d695550188cfa99 (patch)
tree676165f4d633eaab70a5dddd42f0b715a046999e
parent4a67e4e37a734e5dda5f2613d0e41eb091e05533 (diff)
downloadbuildstream-juerg/cas-batch-1.2.tar.gz
_artifactcache/cascache.py: Use BatchUpdateBlobsjuerg/cas-batch-1.2
This uses BatchUpdateBlobs instead of individual blob upload to speed up artifact pushing, if the server supports it. Fixes #677.
-rw-r--r--buildstream/_artifactcache/cascache.py78
1 files changed, 74 insertions, 4 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index ec9d78026..14932fba2 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -934,10 +934,29 @@ class CASCache(ArtifactCache):
missing_blobs[d.hash] = d
# Upload any blobs missing on the server
- for blob_digest in missing_blobs.values():
- with open(self.objpath(blob_digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes
- self._send_blob(remote, blob_digest, f, u_uid=u_uid)
+ self._send_blobs(remote, missing_blobs.values(), u_uid)
+
+ def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
+ batch = _CASBatchUpdate(remote)
+
+ for digest in digests:
+ with open(self.objpath(digest), 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
+
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
+ not remote.batch_update_supported):
+ # Too large for batch request, upload in independent request.
+ self._send_blob(remote, digest, f, u_uid=u_uid)
+ else:
+ if not batch.add(digest, f):
+ # Not enough space left in batch request.
+ # Complete pending batch first.
+ batch.send()
+ batch = _CASBatchUpdate(remote)
+ batch.add(digest, f)
+
+ # Send final batch
+ batch.send()
# Represents a single remote CAS cache.
@@ -1012,6 +1031,17 @@ class _CASRemote():
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
raise
+ # Check whether the server supports BatchUpdateBlobs()
+ self.batch_update_supported = False
+ try:
+ request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ response = self.cas.BatchUpdateBlobs(request)
+ self.batch_update_supported = True
+ except grpc.RpcError as e:
+ if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
+ e.code() != grpc.StatusCode.PERMISSION_DENIED):
+ raise
+
self._initialized = True
@@ -1059,6 +1089,46 @@ class _CASBatchRead():
yield (response.digest, response.data)
+# Represents a batch of blobs queued for upload.
+#
+class _CASBatchUpdate():
+ def __init__(self, remote):
+ self._remote = remote
+ self._max_total_size_bytes = remote.max_batch_total_size_bytes
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ self._size = 0
+ self._sent = False
+
+ def add(self, digest, stream):
+ assert not self._sent
+
+ new_batch_size = self._size + digest.size_bytes
+ if new_batch_size > self._max_total_size_bytes:
+ # Not enough space left in current batch
+ return False
+
+ blob_request = self._request.requests.add()
+ blob_request.digest.hash = digest.hash
+ blob_request.digest.size_bytes = digest.size_bytes
+ blob_request.data = stream.read(digest.size_bytes)
+ self._size = new_batch_size
+ return True
+
+ def send(self):
+ assert not self._sent
+ self._sent = True
+
+ if len(self._request.requests) == 0:
+ return
+
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+
+ for response in batch_response.responses:
+ if response.status.code != grpc.StatusCode.OK.value[0]:
+ raise ArtifactError("Failed to upload blob {}: {}".format(
+ response.digest.hash, response.status.code))
+
+
def _grouper(iterable, n):
while True:
try: