summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-07-02 13:25:24 +0100
committerJürg Billeter <j@bitron.ch>2019-08-20 08:09:52 +0200
commitdac43a80346576e957657d53fa215d0e358cd7ac (patch)
tree024b6d16b81e39afa9a438ed00b4c13f98d45142
parent5c68894dda00b3f1760b4f9b0c6bd4658aeb7ccb (diff)
downloadbuildstream-dac43a80346576e957657d53fa215d0e358cd7ac.tar.gz
casremote.py: Use FetchMissingBlobs in CASBatchRead
-rw-r--r--src/buildstream/_cas/cascache.py44
-rw-r--r--src/buildstream/_cas/casremote.py26
2 files changed, 11 insertions, 59 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 5ebf54601..3cb4ba87b 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -824,18 +824,9 @@ class CASCache():
return objpath
- def _batch_download_complete(self, batch, *, missing_blobs=None):
- for digest, data in batch.send(missing_blobs=missing_blobs):
- with self._temporary_object() as f:
- f.write(data)
- f.flush()
-
- added_digest = self.add_object(path=f.name, link_directly=True)
- assert added_digest.hash == digest.hash
-
# Helper function for _fetch_directory().
def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue):
- self._batch_download_complete(batch)
+ batch.send()
# All previously scheduled directories are now locally available,
# move them to the processing queue.
@@ -850,17 +841,8 @@ class CASCache():
if in_local_cache:
# Skip download, already in local cache.
pass
- elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
- not remote.batch_read_supported):
- # Too large for batch request, download in independent request.
- self._ensure_blob(remote, digest)
- in_local_cache = True
else:
- if not batch.add(digest):
- # Not enough space left in batch request.
- # Complete pending batch first.
- batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
- batch.add(digest)
+ batch.add(digest)
if recursive:
if in_local_cache:
@@ -944,27 +926,9 @@ class CASCache():
batch = _CASBatchRead(remote)
for digest in digests:
- if (digest.size_bytes >= remote.max_batch_total_size_bytes or
- not remote.batch_read_supported):
- # Too large for batch request, download in independent request.
- try:
- self._ensure_blob(remote, digest)
- except grpc.RpcError as e:
- if e.code() == grpc.StatusCode.NOT_FOUND:
- missing_blobs.append(digest)
- else:
- raise CASCacheError("Failed to fetch blob: {}".format(e)) from e
- else:
- if not batch.add(digest):
- # Not enough space left in batch request.
- # Complete pending batch first.
- self._batch_download_complete(batch, missing_blobs=missing_blobs)
-
- batch = _CASBatchRead(remote)
- batch.add(digest)
+ batch.add(digest)
- # Complete last pending batch
- self._batch_download_complete(batch, missing_blobs=missing_blobs)
+ batch.send(missing_blobs=missing_blobs)
return missing_blobs
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 2a6028bf8..972c4d818 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -316,35 +316,25 @@ class CASRemote():
class _CASBatchRead():
def __init__(self, remote):
self._remote = remote
- self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchReadBlobsRequest()
- if remote.instance_name:
- self._request.instance_name = remote.instance_name
- self._size = 0
+ self._request = local_cas_pb2.FetchMissingBlobsRequest()
+ self._request.instance_name = remote.local_cas_instance_name
self._sent = False
def add(self, digest):
assert not self._sent
- new_batch_size = self._size + digest.size_bytes
- if new_batch_size > self._max_total_size_bytes:
- # Not enough space left in current batch
- return False
-
- request_digest = self._request.digests.add()
- request_digest.hash = digest.hash
- request_digest.size_bytes = digest.size_bytes
- self._size = new_batch_size
- return True
+ request_digest = self._request.blob_digests.add()
+ request_digest.CopyFrom(digest)
def send(self, *, missing_blobs=None):
assert not self._sent
self._sent = True
- if not self._request.digests:
+ if not self._request.blob_digests:
return
- batch_response = self._remote.cas.BatchReadBlobs(self._request)
+ local_cas = self._remote.cascache._get_local_cas()
+ batch_response = local_cas.FetchMissingBlobs(self._request)
for response in batch_response.responses:
if response.status.code == code_pb2.NOT_FOUND:
@@ -361,8 +351,6 @@ class _CASBatchRead():
raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
response.digest.hash, response.digest.size_bytes, len(response.data)))
- yield (response.digest, response.data)
-
# Represents a batch of blobs queued for upload.
#