diff options
author | Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> | 2019-01-03 12:21:58 +0000 |
---|---|---|
committer | Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> | 2019-01-15 11:57:02 +0000 |
commit | 7329ef562a3df0d81cfeea20fbfb5cbba366b91a (patch) | |
tree | 3ae40f346f1244049250d52b9cd1523e19b25d38 | |
parent | 15db919f6fafd0c2b0abbddde78a084d5b5379a5 (diff) | |
download | buildstream-raoul/802-refactor-artifactcache.tar.gz |
artifactcache: implement new push methodsraoul/802-refactor-artifactcache
Similar to the pull methods, this implements a yield_directory_digests methods
that iterates over blobs in the local CAS, with the upload_blob sending blobs
to a remote and batching them where appropriate.
Part of #802
-rw-r--r-- | buildstream/_artifactcache.py | 41 | ||||
-rw-r--r-- | buildstream/_cas/cascache.py | 149 | ||||
-rw-r--r-- | buildstream/_cas/casremote.py | 102 | ||||
-rw-r--r-- | buildstream/_cas/transfer.py | 11 | ||||
-rw-r--r-- | buildstream/sandbox/_sandboxremote.py | 4 |
5 files changed, 166 insertions, 141 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index 21db707f1..46280412c 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -29,7 +29,7 @@ from . import utils from . import _yaml from ._cas import BlobNotFound, CASRemote, CASRemoteSpec -from ._cas.transfer import cas_directory_download, cas_tree_download +from ._cas.transfer import cas_directory_upload, cas_directory_download, cas_tree_download CACHE_SIZE_FILE = "cache_size" @@ -608,16 +608,41 @@ class ArtifactCache(): for remote in push_remotes: remote.init() + skipped_remote = True display_key = element._get_brief_display_key() element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - if self.cas.push(refs, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + try: + for ref in refs: + # Check whether ref is already on the server in which case + # there is no need to push the ref + root_digest = self.cas.resolve_ref(ref) + response = remote.get_reference(ref) + if (response is not None and + response.hash == root_digest.hash and + response.size_bytes == root_digest.size_bytes): + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key())) + continue + + # upload blobs + cas_directory_upload(self.cas, remote, root_digest) + remote.update_reference(ref, root_digest) + + skipped_remote = False + + except CASError as e: + if str(e.reason) == "StatusCode.RESOURCE_EXHAUSTED": + element.warn("Failed to push element to {}: Resource exhuasted" + .format(remote.spec.url)) + continue + else: + raise ArtifactError("Failed to push refs {}: {}".format(refs, e), + temporary=True) from e + + if skipped_remote is False: pushed = True - else: - element.info("Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key() - )) + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) return pushed @@ -722,7 +747,7 @@ class ArtifactCache(): return for remote in push_remotes: - self.cas.push_directory(remote, directory) + cas_directory_upload(self.cas, remote, directory.ref) # push_message(): # diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py index e3b0332be..7ea46a090 100644 --- a/buildstream/_cas/cascache.py +++ b/buildstream/_cas/cascache.py @@ -18,23 +18,16 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> import hashlib -import itertools import os import stat import tempfile -import uuid import contextlib -import grpc - from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 -from .._protos.buildstream.v2 import buildstream_pb2 from .. import utils from .._exceptions import CASCacheError -from .casremote import _CASBatchUpdate - # A CASCache manages a CAS repository as specified in the Remote Execution API. # @@ -196,73 +189,6 @@ class CASCache(): self.set_ref(newref, tree) - # push(): - # - # Push committed refs to remote repository. - # - # Args: - # refs (list): The refs to push - # remote (CASRemote): The remote to push to - # - # Returns: - # (bool): True if any remote was updated, False if no pushes were required - # - # Raises: - # (CASCacheError): if there was an error - # - def push(self, refs, remote): - skipped_remote = True - try: - for ref in refs: - tree = self.resolve_ref(ref) - - # Check whether ref is already on the server in which case - # there is no need to push the ref - try: - request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) - request.key = ref - response = remote.ref_storage.GetReference(request) - - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: - # ref is already on the server with the same tree - continue - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - # Intentionally re-raise RpcError for outer except block. - raise - - self._send_directory(remote, tree) - - request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name) - request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes - remote.ref_storage.UpdateReference(request) - - skipped_remote = False - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e - - return not skipped_remote - - # push_directory(): - # - # Push the given virtual directory to a remote. - # - # Args: - # remote (CASRemote): The remote to push to - # directory (Directory): A virtual directory object to push. - # - # Raises: - # (CASCacheError): if there was an error - # - def push_directory(self, remote, directory): - remote.init() - - self._send_directory(remote, directory.ref) - # objpath(): # # Return the path of an object based on its digest. @@ -534,6 +460,27 @@ class CASCache(): else: return None + def yield_directory_digests(self, directory_digest): + # parse directory, and recursively add blobs + d = remote_execution_pb2.Digest() + d.hash = directory_digest.hash + d.size_bytes = directory_digest.size_bytes + yield d + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(directory_digest), 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + d = remote_execution_pb2.Digest() + d.hash = filenode.digest.hash + d.size_bytes = filenode.digest.size_bytes + yield d + + for dirnode in directory.directories: + yield from self.yield_directory_digests(dirnode.digest) + ################################################ # Local Private Methods # ################################################ @@ -722,57 +669,3 @@ class CASCache(): for dirnode in directory.directories: yield from self._required_blobs(dirnode.digest) - - def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): - required_blobs = self._required_blobs(digest) - - missing_blobs = dict() - # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) - - for required_digest in required_blobs_group: - d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes - - response = remote.cas.FindMissingBlobs(request) - for missing_digest in response.missing_blob_digests: - d = remote_execution_pb2.Digest() - d.hash = missing_digest.hash - d.size_bytes = missing_digest.size_bytes - missing_blobs[d.hash] = d - - # Upload any blobs missing on the server - self._send_blobs(remote, missing_blobs.values(), u_uid) - - def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()): - batch = _CASBatchUpdate(remote) - - for digest in digests: - with open(self.objpath(digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == digest.size_bytes - - if (digest.size_bytes >= remote.max_batch_total_size_bytes or - not remote.batch_update_supported): - # Too large for batch request, upload in independent request. - remote._send_blob(digest, f, u_uid=u_uid) - else: - if not batch.add(digest, f): - # Not enough space left in batch request. - # Complete pending batch first. - batch.send() - batch = _CASBatchUpdate(remote) - batch.add(digest, f) - - # Send final batch - batch.send() - - -def _grouper(iterable, n): - while True: - try: - current = next(iterable) - except StopIteration: - return - yield itertools.chain([current], itertools.islice(iterable, n - 1)) diff --git a/buildstream/_cas/casremote.py b/buildstream/_cas/casremote.py index 0e75b0936..8435230fc 100644 --- a/buildstream/_cas/casremote.py +++ b/buildstream/_cas/casremote.py @@ -1,5 +1,6 @@ from collections import namedtuple import io +import itertools import os import multiprocessing import signal @@ -288,6 +289,18 @@ class CASRemote(): else: return None + # update_reference(): + # + # Args: + # ref (str): Reference to update + # digest (Digest): New digest to update ref with + def update_reference(self, ref, digest): + request = buildstream_pb2.UpdateReferenceRequest() + request.keys.append(ref) + request.digest.hash = digest.hash + request.digest.size_bytes = digest.size_bytes + self.ref_storage.UpdateReference(request) + def get_tree_blob(self, tree_digest): self.init() f = tempfile.NamedTemporaryFile(dir=self.tmpdir) @@ -397,6 +410,68 @@ class CASRemote(): while self.__tmp_downloads: yield self.__tmp_downloads.pop() + # upload_blob(): + # + # Push blobs given an iterator over blob files + # + # Args: + # digest (Digest): digest we want to upload + # blob_file (str): Name of file location + # u_uid (str): Used to identify to the bytestream service + # + def upload_blob(self, digest, blob_file, u_uid=uuid.uuid4()): + with open(blob_file, 'rb') as f: + assert os.fstat(f.fileno()).st_size == digest.size_bytes + + if (digest.size_bytes >= self.max_batch_total_size_bytes or + not self.batch_update_supported): + # Too large for batch request, upload in independent request. + self._send_blob(digest, f, u_uid=u_uid) + else: + if self.__batch_update.add(digest, f) is False: + self.__batch_update.send() + self.__batch_update = _CASBatchUpdate(self) + self.__batch_update.add(digest, f) + + # send_update_batch(): + # + # Sends anything left in the update batch + # + def send_update_batch(self): + # make sure everything is sent + self.__batch_update.send() + self.__batch_update = _CASBatchUpdate(self) + + # find_missing_blobs() + # + # Does FindMissingBlobs request to remote + # + # Args: + # required_blobs ([Digest]): list of blobs required + # + # Returns: + # (Dict(Digest)): missing blobs + def find_missing_blobs(self, required_blobs): + self.init() + missing_blobs = dict() + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest() + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = self.cas.FindMissingBlobs(request) + for missing_digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = missing_digest.hash + d.size_bytes = missing_digest.size_bytes + missing_blobs[d.hash] = d + + return missing_blobs + ################################################ # Local Private Methods # ################################################ @@ -435,7 +510,10 @@ class CASRemote(): offset += chunk_size finished = request.finish_write - response = self.bytestream.Write(request_stream(resource_name, stream)) + try: + response = self.bytestream.Write(request_stream(resource_name, stream)) + except grpc.RpcError as e: + raise CASRemoteError("Failed to upload blob: {}".format(e), reason=e.code()) assert response.committed_size == digest.size_bytes @@ -449,6 +527,15 @@ class CASRemote(): self.__batch_read = _CASBatchRead(self) +def _grouper(iterable, n): + while True: + try: + current = next(iterable) + except StopIteration: + return + yield itertools.chain([current], itertools.islice(iterable, n - 1)) + + # Represents a batch of blobs queued for fetching. # class _CASBatchRead(): @@ -480,7 +567,11 @@ class _CASBatchRead(): if not self._request.digests: return - batch_response = self._remote.cas.BatchReadBlobs(self._request) + try: + batch_response = self._remote.cas.BatchReadBlobs(self._request) + except grpc.RpcError as e: + raise CASRemoteError("Failed to read blob batch: {}".format(e), + reason=e.code()) from e for response in batch_response.responses: if response.status.code == code_pb2.NOT_FOUND: @@ -528,7 +619,12 @@ class _CASBatchUpdate(): if not self._request.requests: return - batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + # Want to raise a CASRemoteError if + try: + batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + except grpc.RpcError as e: + raise CASRemoteError("Failed to upload blob batch: {}".format(e), + reason=e.code()) from e for response in batch_response.responses: if response.status.code != code_pb2.OK: diff --git a/buildstream/_cas/transfer.py b/buildstream/_cas/transfer.py index 5eaaf0920..c1293c022 100644 --- a/buildstream/_cas/transfer.py +++ b/buildstream/_cas/transfer.py @@ -49,3 +49,14 @@ def cas_tree_download(caslocal, casremote, tree_digest): # get root digest from tree and return that return _message_digest(tree.root.SerializeToString()) + + +def cas_directory_upload(caslocal, casremote, root_digest): + required_blobs = caslocal.yield_directory_digests(root_digest) + missing_blobs = casremote.find_missing_blobs(required_blobs) + for blob in missing_blobs.values(): + blob_file = caslocal.objpath(blob) + casremote.upload_blob(blob, blob_file) + + # send remaining blobs + casremote.send_update_batch() diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index bea175435..baa5aaaaf 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -39,7 +39,7 @@ from .._exceptions import SandboxError from .. import _yaml from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc from .._cas import CASRemote, CASRemoteSpec -from .._cas.transfer import cas_tree_download +from .._cas.transfer import cas_tree_download, cas_directory_upload class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')): @@ -345,7 +345,7 @@ class SandboxRemote(Sandbox): # Now, push that key (without necessarily needing a ref) to the remote. try: - cascache.push_directory(casremote, upload_vdir) + cas_directory_upload(cascache, casremote, upload_vdir.ref) except grpc.RpcError as e: raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e |