From 34e81ae1835cb8732798e7d4eab4d470e96fad13 Mon Sep 17 00:00:00 2001 From: Tiago Gomes Date: Fri, 10 Aug 2018 13:48:39 +0100 Subject: artifactcache: fix oversight We want to check if some file is already cached here, not the parent directory. --- buildstream/_artifactcache/cascache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index b6b1d436d..a7b92d6e2 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -803,7 +803,7 @@ class CASCache(ArtifactCache): directory.ParseFromString(f.read()) for filenode in directory.files: - fileobjpath = self.objpath(tree) + fileobjpath = self.objpath(filenode.digest) if os.path.exists(fileobjpath): # already in local cache continue -- cgit v1.2.1 From b842658cf4b0d3e8d57552fdd5811a969e878392 Mon Sep 17 00:00:00 2001 From: Tiago Gomes Date: Thu, 13 Sep 2018 15:14:27 +0100 Subject: artifactcache: improve _create_tree() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rename it to _commit_directory() becauseā€¦ it is what it does; and also for symmetry with _fetch_directory(). * Rename digest to dir_digest to make it clear this is a digest for a directory. A following commit will also reuse the same variable name * Document method. --- buildstream/_artifactcache/cascache.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index a7b92d6e2..d48c8aa01 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -115,7 +115,7 @@ class CASCache(ArtifactCache): def commit(self, element, content, keys): refs = [self.get_artifact_fullname(element, key) for key in keys] - tree = self._create_tree(content) + tree = self._commit_directory(content) for ref in refs: self.set_ref(ref, tree) @@ -623,7 +623,21 @@ class CASCache(ArtifactCache): def _refpath(self, ref): return os.path.join(self.casdir, 'refs', 'heads', ref) - def _create_tree(self, path, *, digest=None): + # _commit_directory(): + # + # Adds local directory to content addressable store. + # + # Adds files, symbolic links and recursively other directories in + # a local directory to the content addressable store. + # + # Args: + # path (str): Path to the directory to add. + # dir_digest (Digest): An optional Digest object to use. + # + # Returns: + # (Digest): Digest object for the directory added. + # + def _commit_directory(self, path, *, dir_digest=None): directory = remote_execution_pb2.Directory() for name in sorted(os.listdir(path)): @@ -632,7 +646,7 @@ class CASCache(ArtifactCache): if stat.S_ISDIR(mode): dirnode = directory.directories.add() dirnode.name = name - self._create_tree(full_path, digest=dirnode.digest) + self._commit_directory(full_path, dir_digest=dirnode.digest) elif stat.S_ISREG(mode): filenode = directory.files.add() filenode.name = name @@ -645,7 +659,8 @@ class CASCache(ArtifactCache): else: raise ArtifactError("Unsupported file type for {}".format(full_path)) - return self.add_object(digest=digest, buffer=directory.SerializeToString()) + return self.add_object(digest=dir_digest, + buffer=directory.SerializeToString()) def _get_subdir(self, tree, subdir): head, name = os.path.split(subdir) -- cgit v1.2.1 From e3ff069ec85df66cac20efb6b378b230ee0a930f Mon Sep 17 00:00:00 2001 From: Tiago Gomes Date: Thu, 13 Sep 2018 15:25:56 +0100 Subject: artifactcache: improve _fetch_directory() * Rename tree to dir_digest to make it clear this is a Digest object, and not a Tree object. * Add documentation --- buildstream/_artifactcache/cascache.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index d48c8aa01..8a0eb5195 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -803,14 +803,26 @@ class CASCache(ArtifactCache): out.flush() assert digest.size_bytes == os.fstat(out.fileno()).st_size - def _fetch_directory(self, remote, tree): - objpath = self.objpath(tree) + # _fetch_directory(): + # + # Fetches remote directory and adds it to content addressable store. + # + # Fetches files, symbolic links and recursively other directories in + # the remote directory and adds them to the content addressable + # store. + # + # Args: + # remote (Remote): The remote to use. + # dir_digest (Digest): Digest object for the directory to fetch. + # + def _fetch_directory(self, remote, dir_digest): + objpath = self.objpath(dir_digest) if os.path.exists(objpath): # already in local cache return with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out: - self._fetch_blob(remote, tree, out) + self._fetch_blob(remote, dir_digest, out) directory = remote_execution_pb2.Directory() @@ -832,10 +844,11 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: self._fetch_directory(remote, dirnode.digest) - # place directory blob only in final location when we've downloaded - # all referenced blobs to avoid dangling references in the repository + # Place directory blob only in final location when we've + # downloaded all referenced blobs to avoid dangling + # references in the repository. digest = self.add_object(path=out.name) - assert digest.hash == tree.hash + assert digest.hash == dir_digest.hash # Represents a single remote CAS cache. -- cgit v1.2.1 From 9bca9183aba97f82c6158a0df2e3a8fc433ca75f Mon Sep 17 00:00:00 2001 From: Tiago Gomes Date: Wed, 12 Sep 2018 13:24:31 +0100 Subject: artifactcache: rename get_quota_exceeded() --- buildstream/_artifactcache/artifactcache.py | 6 +++--- buildstream/_scheduler/queues/buildqueue.py | 2 +- buildstream/_scheduler/scheduler.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index b0cb4f353..9b5df1d26 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -277,7 +277,7 @@ class ArtifactCache(): "Please increase the cache-quota in {}." .format(self.context.config_origin or default_conf)) - if self.get_quota_exceeded(): + if self.has_quota_exceeded(): raise ArtifactError("Cache too full. Aborting.", detail=detail, reason="cache-too-full") @@ -364,14 +364,14 @@ class ArtifactCache(): self._cache_size = cache_size self._write_cache_size(self._cache_size) - # get_quota_exceeded() + # has_quota_exceeded() # # Checks if the current artifact cache size exceeds the quota. # # Returns: # (bool): True of the quota is exceeded # - def get_quota_exceeded(self): + def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota ################################################ diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 90e3ad792..e63475f05 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -65,7 +65,7 @@ class BuildQueue(Queue): # If the estimated size outgrows the quota, ask the scheduler # to queue a job to actually check the real cache size. # - if artifacts.get_quota_exceeded(): + if artifacts.has_quota_exceeded(): self._scheduler.check_cache_size() def done(self, job, element, result, success): diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 8ab79c5f5..09af63de5 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -349,7 +349,7 @@ class Scheduler(): platform = Platform.get_platform() artifacts = platform.artifactcache - if not artifacts.get_quota_exceeded(): + if not artifacts.has_quota_exceeded(): return job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', -- cgit v1.2.1 From 2d025076cf20060c6ad6007a1a2c3603b72d3fef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 19 Sep 2018 11:09:39 +0200 Subject: _artifactcache/cascache.py: Add _ensure_blob helper This adds directory objects to the local repository before downloading files in the directory. However, artifact references are still stored only after downloading the complete directory and thus, there won't be dangling references. This will anyway be required for partial download support. --- buildstream/_artifactcache/cascache.py | 57 ++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 8a0eb5195..619613a41 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -803,6 +803,31 @@ class CASCache(ArtifactCache): out.flush() assert digest.size_bytes == os.fstat(out.fileno()).st_size + # _ensure_blob(): + # + # Fetch and add blob if it's not already local. + # + # Args: + # remote (Remote): The remote to use. + # digest (Digest): Digest object for the blob to fetch. + # + # Returns: + # (str): The path of the object + # + def _ensure_blob(self, remote, digest): + objpath = self.objpath(digest) + if os.path.exists(objpath): + # already in local repository + return objpath + + with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: + self._fetch_blob(remote, digest, f) + + added_digest = self.add_object(path=f.name) + assert added_digest.hash == digest.hash + + return objpath + # _fetch_directory(): # # Fetches remote directory and adds it to content addressable store. @@ -821,34 +846,18 @@ class CASCache(ArtifactCache): # already in local cache return - with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out: - self._fetch_blob(remote, dir_digest, out) - - directory = remote_execution_pb2.Directory() - - with open(out.name, 'rb') as f: - directory.ParseFromString(f.read()) - - for filenode in directory.files: - fileobjpath = self.objpath(filenode.digest) - if os.path.exists(fileobjpath): - # already in local cache - continue + objpath = self._ensure_blob(remote, dir_digest) - with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: - self._fetch_blob(remote, filenode.digest, f) + directory = remote_execution_pb2.Directory() - digest = self.add_object(path=f.name) - assert digest.hash == filenode.digest.hash + with open(objpath, 'rb') as f: + directory.ParseFromString(f.read()) - for dirnode in directory.directories: - self._fetch_directory(remote, dirnode.digest) + for filenode in directory.files: + self._ensure_blob(remote, filenode.digest) - # Place directory blob only in final location when we've - # downloaded all referenced blobs to avoid dangling - # references in the repository. - digest = self.add_object(path=out.name) - assert digest.hash == dir_digest.hash + for dirnode in directory.directories: + self._fetch_directory(remote, dirnode.digest) # Represents a single remote CAS cache. -- cgit v1.2.1 From 01831afe4cf367c56fdee5c6c59fa426e2829a78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 24 Sep 2018 10:03:36 +0100 Subject: _artifactcache/cascache.py: Increase payload size limit for uploads gRPC can handle 1 MiB payloads. Increase size limit from 64 KiB to speed up uploads.` --- buildstream/_artifactcache/cascache.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 619613a41..7360d548e 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -43,6 +43,11 @@ from .._exceptions import ArtifactError from . import ArtifactCache +# The default limit for gRPC messages is 4 MiB. +# Limit payload to 1 MiB to leave sufficient headroom for metadata. +_MAX_PAYLOAD_BYTES = 1024 * 1024 + + # A CASCache manages artifacts in a CAS repository as specified in the # Remote Execution API. # @@ -330,12 +335,12 @@ class CASCache(ArtifactCache): finished = False remaining = digest.size_bytes while not finished: - chunk_size = min(remaining, 64 * 1024) + chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) remaining -= chunk_size request = bytestream_pb2.WriteRequest() request.write_offset = offset - # max. 64 kB chunks + # max. _MAX_PAYLOAD_BYTES chunks request.data = f.read(chunk_size) request.resource_name = resname request.finish_write = remaining <= 0 -- cgit v1.2.1 From 1b7245da9b9d3a39554f2b465a671f6d19dc2029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 24 Sep 2018 10:07:30 +0100 Subject: _artifactcache/casserver.py: Harmonize payload size limit Use 1 MiB as payload size limit on the server side for both individual downloads and batch uploads. --- buildstream/_artifactcache/casserver.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 8c3ece27d..d833878d5 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -38,8 +38,9 @@ from .._context import Context from .cascache import CASCache -# The default limit for gRPC messages is 4 MiB -_MAX_BATCH_TOTAL_SIZE_BYTES = 4 * 1024 * 1024 +# The default limit for gRPC messages is 4 MiB. +# Limit payload to 1 MiB to leave sufficient headroom for metadata. +_MAX_PAYLOAD_BYTES = 1024 * 1024 # Trying to push an artifact that is too large @@ -158,7 +159,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): remaining = client_digest.size_bytes - request.read_offset while remaining > 0: - chunk_size = min(remaining, 64 * 1024) + chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) remaining -= chunk_size response = bytestream_pb2.ReadResponse() @@ -242,7 +243,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres for digest in request.digests: batch_size += digest.size_bytes - if batch_size > _MAX_BATCH_TOTAL_SIZE_BYTES: + if batch_size > _MAX_PAYLOAD_BYTES: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) return response @@ -269,7 +270,7 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): cache_capabilities = response.cache_capabilities cache_capabilities.digest_function.append(remote_execution_pb2.SHA256) cache_capabilities.action_cache_update_capabilities.update_enabled = False - cache_capabilities.max_batch_total_size_bytes = _MAX_BATCH_TOTAL_SIZE_BYTES + cache_capabilities.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES cache_capabilities.symlink_absolute_path_strategy = remote_execution_pb2.CacheCapabilities.ALLOWED response.deprecated_api_version.major = 2 -- cgit v1.2.1 From fd46a9f9779781664fa3eef697bc489e5d1cf4cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Thu, 13 Sep 2018 10:07:37 +0200 Subject: _artifactcache/cascache.py: Use BatchReadBlobs This uses BatchReadBlobs instead of individual blob download to speed up artifact pulling, if the server supports it. Fixes #554. --- buildstream/_artifactcache/cascache.py | 149 ++++++++++++++++++++++++++++++--- 1 file changed, 137 insertions(+), 12 deletions(-) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 7360d548e..e2c0d44b5 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -833,6 +833,55 @@ class CASCache(ArtifactCache): return objpath + def _batch_download_complete(self, batch): + for digest, data in batch.send(): + with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: + f.write(data) + f.flush() + + added_digest = self.add_object(path=f.name) + assert added_digest.hash == digest.hash + + # Helper function for _fetch_directory(). + def _fetch_directory_batch(self, remote, batch, fetch_queue, fetch_next_queue): + self._batch_download_complete(batch) + + # All previously scheduled directories are now locally available, + # move them to the processing queue. + fetch_queue.extend(fetch_next_queue) + fetch_next_queue.clear() + return _CASBatchRead(remote) + + # Helper function for _fetch_directory(). + def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False): + in_local_cache = os.path.exists(self.objpath(digest)) + + if in_local_cache: + # Skip download, already in local cache. + pass + elif (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_read_supported): + # Too large for batch request, download in independent request. + self._ensure_blob(remote, digest) + in_local_cache = True + else: + if not batch.add(digest): + # Not enough space left in batch request. + # Complete pending batch first. + batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) + batch.add(digest) + + if recursive: + if in_local_cache: + # Add directory to processing queue. + fetch_queue.append(digest) + else: + # Directory will be available after completing pending batch. + # Add directory to deferred processing queue. + fetch_next_queue.append(digest) + + return batch + # _fetch_directory(): # # Fetches remote directory and adds it to content addressable store. @@ -846,23 +895,32 @@ class CASCache(ArtifactCache): # dir_digest (Digest): Digest object for the directory to fetch. # def _fetch_directory(self, remote, dir_digest): - objpath = self.objpath(dir_digest) - if os.path.exists(objpath): - # already in local cache - return + fetch_queue = [dir_digest] + fetch_next_queue = [] + batch = _CASBatchRead(remote) - objpath = self._ensure_blob(remote, dir_digest) + while len(fetch_queue) + len(fetch_next_queue) > 0: + if len(fetch_queue) == 0: + batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) - directory = remote_execution_pb2.Directory() + dir_digest = fetch_queue.pop(0) - with open(objpath, 'rb') as f: - directory.ParseFromString(f.read()) + objpath = self._ensure_blob(remote, dir_digest) - for filenode in directory.files: - self._ensure_blob(remote, filenode.digest) + directory = remote_execution_pb2.Directory() + with open(objpath, 'rb') as f: + directory.ParseFromString(f.read()) - for dirnode in directory.directories: - self._fetch_directory(remote, dirnode.digest) + for dirnode in directory.directories: + batch = self._fetch_directory_node(remote, dirnode.digest, batch, + fetch_queue, fetch_next_queue, recursive=True) + + for filenode in directory.files: + batch = self._fetch_directory_node(remote, filenode.digest, batch, + fetch_queue, fetch_next_queue) + + # Fetch final batch + self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) # Represents a single remote CAS cache. @@ -912,11 +970,78 @@ class _CASRemote(): self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) + self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel) self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel) + self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES + try: + request = remote_execution_pb2.GetCapabilitiesRequest() + response = self.capabilities.GetCapabilities(request) + server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes + if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: + self.max_batch_total_size_bytes = server_max_batch_total_size_bytes + except grpc.RpcError as e: + # Simply use the defaults for servers that don't implement GetCapabilities() + if e.code() != grpc.StatusCode.UNIMPLEMENTED: + raise + + # Check whether the server supports BatchReadBlobs() + self.batch_read_supported = False + try: + request = remote_execution_pb2.BatchReadBlobsRequest() + response = self.cas.BatchReadBlobs(request) + self.batch_read_supported = True + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.UNIMPLEMENTED: + raise + self._initialized = True +# Represents a batch of blobs queued for fetching. +# +class _CASBatchRead(): + def __init__(self, remote): + self._remote = remote + self._max_total_size_bytes = remote.max_batch_total_size_bytes + self._request = remote_execution_pb2.BatchReadBlobsRequest() + self._size = 0 + self._sent = False + + def add(self, digest): + assert not self._sent + + new_batch_size = self._size + digest.size_bytes + if new_batch_size > self._max_total_size_bytes: + # Not enough space left in current batch + return False + + request_digest = self._request.digests.add() + request_digest.hash = digest.hash + request_digest.size_bytes = digest.size_bytes + self._size = new_batch_size + return True + + def send(self): + assert not self._sent + self._sent = True + + if len(self._request.digests) == 0: + return + + batch_response = self._remote.cas.BatchReadBlobs(self._request) + + for response in batch_response.responses: + if response.status.code != grpc.StatusCode.OK.value[0]: + raise ArtifactError("Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) + if response.digest.size_bytes != len(response.data): + raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + response.digest.hash, response.digest.size_bytes, len(response.data))) + + yield (response.digest, response.data) + + def _grouper(iterable, n): while True: try: -- cgit v1.2.1