From b480a2a5064330133cdd2ec7e14961d63a40c113 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Tue, 27 Oct 2020 17:46:20 +0100 Subject: cascache.py: Reimplement _fetch_directory() with FetchTree() This simplifies the code, delegating the logic to buildbox-casd. --- src/buildstream/_cas/cascache.py | 72 ++++++++++------------------------------ 1 file changed, 17 insertions(+), 55 deletions(-) diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 3182ec050..42e2244c5 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -37,7 +37,7 @@ from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError from .casdprocessmanager import CASDProcessManager -from .casremote import _CASBatchRead, _CASBatchUpdate +from .casremote import _CASBatchRead, _CASBatchUpdate, BlobNotFound _BUFFER_SIZE = 65536 @@ -505,37 +505,6 @@ class CASCache: return objpath - # Helper function for _fetch_directory(). - def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): - batch.send() - - # All previously scheduled directories are now locally available, - # move them to the processing queue. - fetch_queue.extend(fetch_next_queue) - fetch_next_queue.clear() - return _CASBatchRead(remote) - - # Helper function for _fetch_directory(). - def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False): - in_local_cache = os.path.exists(self.objpath(digest)) - - if in_local_cache: - # Skip download, already in local cache. - pass - else: - batch.add(digest) - - if recursive: - if in_local_cache: - # Add directory to processing queue. - fetch_queue.append(digest) - else: - # Directory will be available after completing pending batch. - # Add directory to deferred processing queue. - fetch_next_queue.append(digest) - - return batch - # _fetch_directory(): # # Fetches remote directory and adds it to content addressable store. @@ -548,31 +517,24 @@ class CASCache: # dir_digest (Digest): Digest object for the directory to fetch. # def _fetch_directory(self, remote, dir_digest): - # TODO Use GetTree() if the server supports it - - fetch_queue = [dir_digest] - fetch_next_queue = [] - batch = _CASBatchRead(remote) - - while len(fetch_queue) + len(fetch_next_queue) > 0: - if not fetch_queue: - batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) - - dir_digest = fetch_queue.pop(0) - - objpath = self._ensure_blob(remote, dir_digest) - - directory = remote_execution_pb2.Directory() - with open(objpath, "rb") as f: - directory.ParseFromString(f.read()) + local_cas = self.get_local_cas() - for dirnode in directory.directories: - batch = self._fetch_directory_node( - remote, dirnode.digest, batch, fetch_queue, fetch_next_queue, recursive=True - ) + request = local_cas_pb2.FetchTreeRequest() + request.instance_name = remote.local_cas_instance_name + request.root_digest.CopyFrom(dir_digest) + request.fetch_file_blobs = False - # Fetch final batch - self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + try: + local_cas.FetchTree(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + raise BlobNotFound( + dir_digest.hash, + "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()), + ) from e + raise CASCacheError( + "Failed to fetch directory tree {}: {}: {}".format(dir_digest.hash, e.code().name, e.details()) + ) from e def _fetch_tree(self, remote, digest): objpath = self._ensure_blob(remote, digest) -- cgit v1.2.1