diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-03-27 18:49:31 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-03-27 18:49:31 +0000 |
commit | 49c22f209b32247bc1335f8632518a971c056d9e (patch) | |
tree | fd3a75fede307b35ac69d1dbc9ee5cbe432a7b73 | |
parent | bcf02294094d23ebacd97d149d7c5cab5605f8ea (diff) | |
parent | 60290223f87f17c15a8bd3562a636fe8b7770cfa (diff) | |
download | buildstream-49c22f209b32247bc1335f8632518a971c056d9e.tar.gz |
Merge branch 'juerg/partial-cas-remote' into 'master'
Initial partial CAS support for remote execution
See merge request BuildStream/buildstream!1232
-rw-r--r-- | buildstream/_artifactcache.py | 45 | ||||
-rw-r--r-- | buildstream/_cas/cascache.py | 177 | ||||
-rw-r--r-- | buildstream/_cas/casremote.py | 32 | ||||
-rw-r--r-- | buildstream/sandbox/_sandboxremote.py | 35 | ||||
-rw-r--r-- | tests/artifactcache/push.py | 108 |
5 files changed, 177 insertions, 220 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index 3ca6c6e60..5fd646137 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -359,30 +359,6 @@ class ArtifactCache(BaseCache): return None - # push_directory(): - # - # Push the given virtual directory to all remotes. - # - # Args: - # project (Project): The current project - # directory (Directory): A virtual directory object to push. - # - # Raises: - # (ArtifactError): if there was an error - # - def push_directory(self, project, directory): - if self._has_push_remotes: - push_remotes = [r for r in self._remotes[project] if r.spec.push] - else: - push_remotes = [] - - if not push_remotes: - raise ArtifactError("push_directory was called, but no remote artifact " + - "servers are configured as push remotes.") - - for remote in push_remotes: - self.cas.push_directory(remote, directory) - # push_message(): # # Push the given protobuf message to all remotes. @@ -439,3 +415,24 @@ class ArtifactCache(BaseCache): cache_id = self.cas.resolve_ref(ref, update_mtime=True) vdir = CasBasedDirectory(self.cas, digest=cache_id).descend('logs') return vdir + + # fetch_missing_blobs(): + # + # Fetch missing blobs from configured remote repositories. + # + # Args: + # project (Project): The current project + # missing_blobs (list): The Digests of the blobs to fetch + # + def fetch_missing_blobs(self, project, missing_blobs): + for remote in self._remotes[project]: + if not missing_blobs: + break + + remote.init() + + # fetch_blobs() will return the blobs that are still missing + missing_blobs = self.cas.fetch_blobs(remote, missing_blobs) + + if missing_blobs: + raise ArtifactError("Blobs not found on configured artifact servers") diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py index 63871ebe4..19020e234 100644 --- a/buildstream/_cas/cascache.py +++ b/buildstream/_cas/cascache.py @@ -272,8 +272,14 @@ class CASCache(): tree.hash = response.digest.hash tree.size_bytes = response.digest.size_bytes - # Fetch artifact, excluded_subdirs determined in pullqueue - self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs) + # Fetch Directory objects + self._fetch_directory(remote, tree) + + # Fetch files, excluded_subdirs determined in pullqueue + required_blobs = self._required_blobs(tree, excluded_subdirs=excluded_subdirs) + missing_blobs = self.local_missing_blobs(required_blobs) + if missing_blobs: + self.fetch_blobs(remote, missing_blobs) self.set_ref(ref, tree) @@ -373,23 +379,6 @@ class CASCache(): return not skipped_remote - # push_directory(): - # - # Push the given virtual directory to a remote. - # - # Args: - # remote (CASRemote): The remote to push to - # directory (Directory): A virtual directory object to push. - # - # Raises: - # (CASCacheError): if there was an error - # - def push_directory(self, remote, directory): - remote.init() - - digest = directory._get_digest() - self._send_directory(remote, digest) - # objpath(): # # Return the path of an object based on its digest. @@ -648,6 +637,54 @@ class CASCache(): reachable = set() self._reachable_refs_dir(reachable, tree, update_mtime=True) + # remote_missing_blobs_for_directory(): + # + # Determine which blobs of a directory tree are missing on the remote. + # + # Args: + # digest (Digest): The directory digest + # + # Returns: List of missing Digest objects + # + def remote_missing_blobs_for_directory(self, remote, digest): + required_blobs = self._required_blobs(digest) + + missing_blobs = dict() + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = remote.cas.FindMissingBlobs(request) + for missing_digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = missing_digest.hash + d.size_bytes = missing_digest.size_bytes + missing_blobs[d.hash] = d + + return missing_blobs.values() + + # local_missing_blobs(): + # + # Check local cache for missing blobs. + # + # Args: + # digests (list): The Digests of blobs to check + # + # Returns: Missing Digest objects + # + def local_missing_blobs(self, digests): + missing_blobs = [] + for digest in digests: + objpath = self.objpath(digest) + if not os.path.exists(objpath): + missing_blobs.append(digest) + return missing_blobs + ################################################ # Local Private Methods # ################################################ @@ -841,7 +878,10 @@ class CASCache(): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime) - def _required_blobs(self, directory_digest): + def _required_blobs(self, directory_digest, *, excluded_subdirs=None): + if not excluded_subdirs: + excluded_subdirs = [] + # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() d.hash = directory_digest.hash @@ -860,7 +900,8 @@ class CASCache(): yield d for dirnode in directory.directories: - yield from self._required_blobs(dirnode.digest) + if dirnode.name not in excluded_subdirs: + yield from self._required_blobs(dirnode.digest) # _temporary_object(): # @@ -900,8 +941,8 @@ class CASCache(): return objpath - def _batch_download_complete(self, batch): - for digest, data in batch.send(): + def _batch_download_complete(self, batch, *, missing_blobs=None): + for digest, data in batch.send(missing_blobs=missing_blobs): with self._temporary_object() as f: f.write(data) f.flush() @@ -953,21 +994,19 @@ class CASCache(): # # Fetches remote directory and adds it to content addressable store. # - # Fetches files, symbolic links and recursively other directories in - # the remote directory and adds them to the content addressable - # store. + # This recursively fetches directory objects but doesn't fetch any + # files. # # Args: # remote (Remote): The remote to use. # dir_digest (Digest): Digest object for the directory to fetch. - # excluded_subdirs (list): The optional list of subdirs to not fetch # - def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None): + def _fetch_directory(self, remote, dir_digest): + # TODO Use GetTree() if the server supports it + fetch_queue = [dir_digest] fetch_next_queue = [] batch = _CASBatchRead(remote) - if not excluded_subdirs: - excluded_subdirs = [] while len(fetch_queue) + len(fetch_next_queue) > 0: if not fetch_queue: @@ -982,13 +1021,8 @@ class CASCache(): directory.ParseFromString(f.read()) for dirnode in directory.directories: - if dirnode.name not in excluded_subdirs: - batch = self._fetch_directory_node(remote, dirnode.digest, batch, - fetch_queue, fetch_next_queue, recursive=True) - - for filenode in directory.files: - batch = self._fetch_directory_node(remote, filenode.digest, batch, - fetch_queue, fetch_next_queue) + batch = self._fetch_directory_node(remote, dirnode.digest, batch, + fetch_queue, fetch_next_queue, recursive=True) # Fetch final batch self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue) @@ -1016,30 +1050,55 @@ class CASCache(): return dirdigest - def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): - required_blobs = self._required_blobs(digest) + # fetch_blobs(): + # + # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched. + # + # Args: + # remote (CASRemote): The remote repository to fetch from + # digests (list): The Digests of blobs to fetch + # + # Returns: The Digests of the blobs that were not available on the remote CAS + # + def fetch_blobs(self, remote, digests): + missing_blobs = [] - missing_blobs = dict() - # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) + batch = _CASBatchRead(remote) - for required_digest in required_blobs_group: - d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes + for digest in digests: + if (digest.size_bytes >= remote.max_batch_total_size_bytes or + not remote.batch_read_supported): + # Too large for batch request, download in independent request. + try: + self._ensure_blob(remote, digest) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + missing_blobs.append(digest) + else: + raise CASCacheError("Failed to fetch blob: {}".format(e)) from e + else: + if not batch.add(digest): + # Not enough space left in batch request. + # Complete pending batch first. + self._batch_download_complete(batch, missing_blobs=missing_blobs) - response = remote.cas.FindMissingBlobs(request) - for missing_digest in response.missing_blob_digests: - d = remote_execution_pb2.Digest() - d.hash = missing_digest.hash - d.size_bytes = missing_digest.size_bytes - missing_blobs[d.hash] = d + batch = _CASBatchRead(remote) + batch.add(digest) - # Upload any blobs missing on the server - self._send_blobs(remote, missing_blobs.values(), u_uid) + # Complete last pending batch + self._batch_download_complete(batch, missing_blobs=missing_blobs) + + return missing_blobs - def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()): + # send_blobs(): + # + # Upload blobs to remote CAS. + # + # Args: + # remote (CASRemote): The remote repository to upload to + # digests (list): The Digests of Blobs to upload + # + def send_blobs(self, remote, digests, u_uid=uuid.uuid4()): batch = _CASBatchUpdate(remote) for digest in digests: @@ -1061,6 +1120,12 @@ class CASCache(): # Send final batch batch.send() + def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + missing_blobs = self.remote_missing_blobs_for_directory(remote, digest) + + # Upload any blobs missing on the server + self.send_blobs(remote, missing_blobs, u_uid) + class CASQuota: def __init__(self, context): diff --git a/buildstream/_cas/casremote.py b/buildstream/_cas/casremote.py index df1dd799c..aac0d2802 100644 --- a/buildstream/_cas/casremote.py +++ b/buildstream/_cas/casremote.py @@ -221,28 +221,6 @@ class CASRemote(): return error - # verify_digest_on_remote(): - # - # Check whether the object is already on the server in which case - # there is no need to upload it. - # - # Args: - # digest (Digest): The object digest. - # - def verify_digest_on_remote(self, digest): - self.init() - - request = remote_execution_pb2.FindMissingBlobsRequest() - if self.instance_name: - request.instance_name = self.instance_name - request.blob_digests.extend([digest]) - - response = self.cas.FindMissingBlobs(request) - if digest in response.missing_blob_digests: - return False - - return True - # push_message(): # # Push the given protobuf message to a remote. @@ -344,7 +322,7 @@ class _CASBatchRead(): self._size = new_batch_size return True - def send(self): + def send(self, *, missing_blobs=None): assert not self._sent self._sent = True @@ -355,8 +333,12 @@ class _CASBatchRead(): for response in batch_response.responses: if response.status.code == code_pb2.NOT_FOUND: - raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( - response.digest.hash, response.status.code)) + if missing_blobs is None: + raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( + response.digest.hash, response.status.code)) + else: + missing_blobs.append(response.digest) + if response.status.code != code_pb2.OK: raise CASRemoteError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index be3234796..ada8268c0 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -34,7 +34,7 @@ from ..storage._casbaseddirectory import CasBasedDirectory from .. import _signals from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.google.rpc import code_pb2 -from .._exceptions import SandboxError +from .._exceptions import BstError, SandboxError from .. import _yaml from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc from .._cas import CASRemote, CASRemoteSpec @@ -293,9 +293,13 @@ class SandboxRemote(Sandbox): def _run(self, command, flags, *, cwd, env): stdout, stderr = self._get_output() + context = self._get_context() + project = self._get_project() + cascache = context.get_cascache() + artifactcache = context.artifactcache + # set up virtual dircetory upload_vdir = self.get_virtual_directory() - cascache = self._get_context().get_cascache() # Create directories for all marked directories. This emulates # some of the behaviour of other sandboxes, which create these @@ -331,15 +335,32 @@ class SandboxRemote(Sandbox): if not action_result: casremote = CASRemote(self.storage_remote_spec) + try: + casremote.init() + except grpc.RpcError as e: + raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" + .format(self.storage_url, e)) from e - # Now, push that key (without necessarily needing a ref) to the remote. + # Determine blobs missing on remote try: - cascache.push_directory(casremote, upload_vdir) + missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - if not casremote.verify_digest_on_remote(upload_vdir._get_digest()): - raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.") + # Check if any blobs are also missing locally (partial artifact) + # and pull them from the artifact cache. + try: + local_missing_blobs = cascache.local_missing_blobs(missing_blobs) + if local_missing_blobs: + artifactcache.fetch_missing_blobs(project, local_missing_blobs) + except (grpc.RpcError, BstError) as e: + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e + + # Now, push the missing blobs to the remote. + try: + cascache.send_blobs(casremote, missing_blobs) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e # Push command and action try: diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 69f3fbfbb..56af50a0d 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -136,114 +136,6 @@ def _test_push(user_config_file, project_dir, element_name, element_key, queue): @pytest.mark.datafiles(DATA_DIR) -def test_push_directory(cli, tmpdir, datafiles): - project_dir = str(datafiles) - - # First build the project without the artifact cache configured - result = cli.run(project=project_dir, args=['build', 'target.bst']) - result.assert_success() - - # Assert that we are now cached locally - assert cli.get_element_state(project_dir, 'target.bst') == 'cached' - - # Set up an artifact cache. - with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: - # Configure artifact share - rootcache_dir = os.path.join(str(tmpdir), 'cache') - user_config_file = str(tmpdir.join('buildstream.conf')) - user_config = { - 'scheduler': { - 'pushers': 1 - }, - 'artifacts': { - 'url': share.repo, - 'push': True, - }, - 'cachedir': rootcache_dir - } - - # Write down the user configuration file - _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) - - # Fake minimal context - context = Context() - context.load(config=user_config_file) - context.set_message_handler(message_handler) - - # Load the project and CAS cache - project = Project(project_dir, context) - project.ensure_fully_loaded() - artifactcache = context.artifactcache - cas = artifactcache.cas - - # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'])[0] - element_key = cli.get_element_key(project_dir, 'target.bst') - assert artifactcache.contains(element, element_key) - - # Manually setup the CAS remote - artifactcache.setup_remotes(use_config=True) - artifactcache.initialize_remotes() - assert artifactcache.has_push_remotes(plugin=element) - - # Recreate the CasBasedDirectory object from the cached artifact - artifact_ref = element.get_artifact_name(element_key) - artifact_digest = cas.resolve_ref(artifact_ref) - - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_push_directory, queue, user_config_file, - project_dir, artifact_digest)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - directory_hash = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert directory_hash - assert artifact_digest.hash == directory_hash - assert share.has_object(artifact_digest) - - -def _test_push_directory(user_config_file, project_dir, artifact_digest, queue): - # Fake minimal context - context = Context() - context.load(config=user_config_file) - context.set_message_handler(message_handler) - - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() - - # Create a local CAS cache handle - cas = context.artifactcache - - # Manually setup the CAS remote - cas.setup_remotes(use_config=True) - cas.initialize_remotes() - - if cas.has_push_remotes(): - # Create a CasBasedDirectory from local CAS cache content - directory = CasBasedDirectory(context.artifactcache.cas, digest=artifact_digest) - - # Push the CasBasedDirectory object - cas.push_directory(project, directory) - - digest = directory._get_digest() - queue.put(digest.hash) - else: - queue.put("No remote configured") - - -@pytest.mark.datafiles(DATA_DIR) def test_push_message(tmpdir, datafiles): project_dir = str(datafiles) |