summaryrefslogtreecommitdiff
path: root/buildstream/_cas
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-03-13 14:00:32 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-03-27 16:24:32 +0000
commita8837eab46ebe40231538171fad7808105758f48 (patch)
tree66eed375ff9f7c317caeb7dd15d2000723a93f22 /buildstream/_cas
parent954718fa02bdddb23f3753a7bfe1abe39d555e94 (diff)
downloadbuildstream-a8837eab46ebe40231538171fad7808105758f48.tar.gz
cascache.py: Add fetch_blobs() method
Diffstat (limited to 'buildstream/_cas')
-rw-r--r--buildstream/_cas/cascache.py44
1 files changed, 42 insertions, 2 deletions
diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py
index ce51fb25e..99d78cff5 100644
--- a/buildstream/_cas/cascache.py
+++ b/buildstream/_cas/cascache.py
@@ -948,8 +948,8 @@ class CASCache():
return objpath
- def _batch_download_complete(self, batch):
- for digest, data in batch.send():
+ def _batch_download_complete(self, batch, *, missing_blobs=None):
+ for digest, data in batch.send(missing_blobs=missing_blobs):
with self._temporary_object() as f:
f.write(data)
f.flush()
@@ -1064,6 +1064,46 @@ class CASCache():
return dirdigest
+ # fetch_blobs():
+ #
+ # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched.
+ #
+ # Args:
+ # remote (CASRemote): The remote repository to fetch from
+ # digests (list): The Digests of blobs to fetch
+ #
+ # Returns: The Digests of the blobs that were not available on the remote CAS
+ #
+ def fetch_blobs(self, remote, digests):
+ missing_blobs = []
+
+ batch = _CASBatchRead(remote)
+
+ for digest in digests:
+ if (digest.size_bytes >= remote.max_batch_total_size_bytes or
+ not remote.batch_read_supported):
+ # Too large for batch request, download in independent request.
+ try:
+ self._ensure_blob(remote, digest)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ missing_blobs.append(digest)
+ else:
+ raise CASCacheError("Failed to fetch blob: {}".format(e)) from e
+ else:
+ if not batch.add(digest):
+ # Not enough space left in batch request.
+ # Complete pending batch first.
+ self._batch_download_complete(batch, missing_blobs=missing_blobs)
+
+ batch = _CASBatchRead(remote)
+ batch.add(digest)
+
+ # Complete last pending batch
+ self._batch_download_complete(batch, missing_blobs=missing_blobs)
+
+ return missing_blobs
+
def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)