diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-08-22 17:48:34 +0100 |
---|---|---|
committer | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-09-06 15:55:10 +0100 |
commit | 47a3f93d9795be6af849c112d4180f0ad50ca23b (patch) | |
tree | 2d65dd2c24d9d6bd6795f0680811cf95ae3803e4 /src/buildstream/_artifactcache.py | |
parent | e71621510de7c55aae4855f8bbb64eb2755346a8 (diff) | |
download | buildstream-47a3f93d9795be6af849c112d4180f0ad50ca23b.tar.gz |
Allow splitting artifact caches
This is now split into storage/index remotes, where the former is
expected to be a CASRemote and the latter a BuildStream-specific
remote with the extensions required to store BuildStream artifact
protos.
Diffstat (limited to 'src/buildstream/_artifactcache.py')
-rw-r--r-- | src/buildstream/_artifactcache.py | 373 |
1 files changed, 266 insertions, 107 deletions
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index 73047d376..0e2eb1091 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -25,48 +25,89 @@ from ._exceptions import ArtifactError, CASError, CASCacheError, CASRemoteError from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ artifact_pb2, artifact_pb2_grpc -from ._cas import CASRemote +from ._remote import BaseRemote from .storage._casbaseddirectory import CasBasedDirectory from ._artifact import Artifact from . import utils +# ArtifactRemote(): +# +# Facilitates communication with the BuildStream-specific part of +# artifact remotes. +# +class ArtifactRemote(BaseRemote): + # _configure_protocols(): + # + # Configure the protocols used by this remote as part of the + # remote initialization; Note that this should only be used in + # Remote.init(), and is expected to fail when called by itself. + # + def _configure_protocols(self): + # Add artifact stub + capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + # Check whether the server supports newer proto based artifact. + try: + request = buildstream_pb2.GetCapabilitiesRequest() + if self.instance_name: + request.instance_name = self.instance_name + response = capabilities_service.GetCapabilities(request) + except grpc.RpcError as e: + # Check if this remote has the artifact service + if e.code() == grpc.StatusCode.UNIMPLEMENTED: + raise ArtifactError( + "Configured remote does not have the BuildStream " + "capabilities service. Please check remote configuration.") + # Else raise exception with details + raise ArtifactError( + "Remote initialisation failed: {}".format(e.details())) -# ArtifactRemote extends CASRemote to check during initialisation that there is -# an artifact service -class ArtifactRemote(CASRemote): - def __init__(self, *args): - super().__init__(*args) - self.capabilities_service = None + if not response.artifact_capabilities: + raise ArtifactError( + "Configured remote does not support artifact service") - def init(self): - if not self._initialized: - # do default initialisation - super().init() + # get_artifact(): + # + # Get an artifact proto for a given cache key from the remote. + # + # Args: + # cache_key (str): The artifact cache key. NOTE: This "key" + # is actually the ref/name and its name in + # the protocol is inaccurate. You have been warned. + # + # Returns: + # (Artifact): The artifact proto + # + # Raises: + # grpc.RpcError: If someting goes wrong during the request. + # + def get_artifact(self, cache_key): + artifact_request = artifact_pb2.GetArtifactRequest() + artifact_request.cache_key = cache_key - # Add artifact stub - self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel) + return artifact_service.GetArtifact(artifact_request) - # Check whether the server supports newer proto based artifact. - try: - request = buildstream_pb2.GetCapabilitiesRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = self.capabilities_service.GetCapabilities(request) - except grpc.RpcError as e: - # Check if this remote has the artifact service - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - raise ArtifactError( - "Configured remote does not have the BuildStream " - "capabilities service. Please check remote configuration.") - # Else raise exception with details - raise ArtifactError( - "Remote initialisation failed: {}".format(e.details())) + # update_artifact(): + # + # Update an artifact with the given cache key on the remote with + # the given proto. + # + # Args: + # cache_key (str): The artifact cache key of the artifact to update. + # artifact (ArtifactProto): The artifact proto to send. + # + # Raises: + # grpc.RpcError: If someting goes wrong during the request. + # + def update_artifact(self, cache_key, artifact): + update_request = artifact_pb2.UpdateArtifactRequest() + update_request.cache_key = cache_key + update_request.artifact.CopyFrom(artifact) - if not response.artifact_capabilities: - raise ArtifactError( - "Configured remote does not support artifact service") + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel) + artifact_service.UpdateArtifact(update_request) # An ArtifactCache manages artifacts. @@ -79,7 +120,7 @@ class ArtifactCache(BaseCache): spec_name = "artifact_cache_specs" spec_error = ArtifactError config_node_name = "artifacts" - remote_class = ArtifactRemote + index_remote_class = ArtifactRemote def __init__(self, context): super().__init__(context) @@ -187,22 +228,35 @@ class ArtifactCache(BaseCache): # def push(self, element, artifact): project = element._get_project() + display_key = element._get_brief_display_key() - push_remotes = [r for r in self._remotes[project] if r.spec.push] + index_remotes = [r for r in self._index_remotes[project] if r.push] + storage_remotes = [r for r in self._storage_remotes[project] if r.push] pushed = False + # First push our files to all storage remotes, so that they + # can perform file checks on their end + for remote in storage_remotes: + remote.init() + element.status("Pushing data from artifact {} -> {}".format(display_key, remote)) - for remote in push_remotes: + if self._push_artifact_blobs(artifact, remote): + element.info("Pushed data from artifact {} -> {}".format(display_key, remote)) + else: + element.info("Remote ({}) already has all data of artifact {} cached".format( + remote, element._get_brief_display_key() + )) + + for remote in index_remotes: remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + element.status("Pushing artifact {} -> {}".format(display_key, remote)) - if self._push_artifact(element, artifact, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + if self._push_artifact_proto(element, artifact, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote)) pushed = True else: element.info("Remote ({}) already has artifact {} cached".format( - remote.spec.url, element._get_brief_display_key() + remote, element._get_brief_display_key() )) return pushed @@ -220,26 +274,59 @@ class ArtifactCache(BaseCache): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, pull_buildtrees=False): + artifact = None display_key = key[:self.context.log_key_length] project = element._get_project() - for remote in self._remotes[project]: + errors = [] + # Start by pulling our artifact proto, so that we know which + # blobs to pull + for remote in self._index_remotes[project]: remote.init() try: - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees): - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True + element.status("Pulling artifact {} <- {}".format(display_key, remote)) + artifact = self._pull_artifact_proto(element, key, remote) + if artifact: + element.info("Pulled artifact {} <- {}".format(display_key, remote)) + break else: element.info("Remote ({}) does not have artifact {} cached".format( - remote.spec.url, display_key + remote, display_key )) + except CASError as e: + element.warn("Could not pull from remote {}: {}".format(remote, e)) + errors.append(e) + + if errors and not artifact: + raise ArtifactError("Failed to pull artifact {}".format(display_key), + detail="\n".join(str(e) for e in errors)) + + # If we don't have an artifact, we can't exactly pull our + # artifact + if not artifact: + return False + + errors = [] + # If we do, we can pull it! + for remote in self._storage_remotes[project]: + remote.init() + try: + element.status("Pulling data for artifact {} <- {}".format(display_key, remote)) + if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees): + element.info("Pulled data for artifact {} <- {}".format(display_key, remote)) + return True + + element.info("Remote ({}) does not have artifact {} cached".format( + remote, display_key + )) except CASError as e: - raise ArtifactError("Failed to pull artifact {}: {}".format( - display_key, e)) from e + element.warn("Could not pull from remote {}: {}".format(remote, e)) + errors.append(e) + + if errors: + raise ArtifactError("Failed to pull artifact {}".format(display_key), + detail="\n".join(str(e) for e in errors)) return False @@ -253,7 +340,7 @@ class ArtifactCache(BaseCache): # digest (Digest): The digest of the tree # def pull_tree(self, project, digest): - for remote in self._remotes[project]: + for remote in self._storage_remotes[project]: digest = self.cas.pull_tree(remote, digest) if digest: @@ -276,7 +363,7 @@ class ArtifactCache(BaseCache): def push_message(self, project, message): if self._has_push_remotes: - push_remotes = [r for r in self._remotes[project] if r.spec.push] + push_remotes = [r for r in self._storage_remotes[project] if r.spec.push] else: push_remotes = [] @@ -330,7 +417,7 @@ class ArtifactCache(BaseCache): # missing_blobs (list): The Digests of the blobs to fetch # def fetch_missing_blobs(self, project, missing_blobs): - for remote in self._remotes[project]: + for remote in self._index_remotes[project]: if not missing_blobs: break @@ -357,7 +444,7 @@ class ArtifactCache(BaseCache): if not missing_blobs: return [] - push_remotes = [r for r in self._remotes[project] if r.spec.push] + push_remotes = [r for r in self._storage_remotes[project] if r.spec.push] remote_missing_blobs_list = [] @@ -384,12 +471,12 @@ class ArtifactCache(BaseCache): # def check_remotes_for_element(self, element): # If there are no remotes - if not self._remotes: + if not self._index_remotes: return False project = element._get_project() ref = element.get_artifact_name() - for remote in self._remotes[project]: + for remote in self._index_remotes[project]: remote.init() if self._query_remote(ref, remote): @@ -401,40 +488,59 @@ class ArtifactCache(BaseCache): # Local Private Methods # ################################################ - # _push_artifact() + # _reachable_directories() # - # Pushes relevant directories and then artifact proto to remote. + # Returns: + # (iter): Iterator over directories digests available from artifacts. # - # Args: - # element (Element): The element - # artifact (Artifact): The related artifact being pushed - # remote (CASRemote): Remote to push to + def _reachable_directories(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.files): + yield artifact.files + + if str(artifact.buildtree): + yield artifact.buildtree + + # _reachable_digests() # # Returns: - # (bool): whether the push was successful + # (iter): Iterator over single file digests in artifacts # - def _push_artifact(self, element, artifact, remote): + def _reachable_digests(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) - artifact_proto = artifact._get_proto() + if str(artifact.public_data): + yield artifact.public_data - keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + for log_file in artifact.logs: + yield log_file.digest - # Check whether the artifact is on the server - present = False - for key in keys: - get_artifact = artifact_pb2.GetArtifactRequest() - get_artifact.cache_key = element.get_artifact_name(key) - try: - artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) - artifact_service.GetArtifact(get_artifact) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Error checking artifact cache: {}" - .format(e.details())) - else: - present = True - if present: - return False + # _push_artifact_blobs() + # + # Push the blobs that make up an artifact to the remote server. + # + # Args: + # artifact (Artifact): The artifact whose blobs to push. + # remote (CASRemote): The remote to push the blobs to. + # + # Returns: + # (bool) - True if we uploaded anything, False otherwise. + # + # Raises: + # ArtifactError: If we fail to push blobs (*unless* they're + # already there or we run out of space on the server). + # + def _push_artifact_blobs(self, artifact, remote): + artifact_proto = artifact._get_proto() try: self.cas._send_directory(remote, artifact_proto.files) @@ -463,33 +569,68 @@ class ArtifactCache(BaseCache): raise ArtifactError("Failed to push artifact blobs: {}".format(e.details())) return False - # finally need to send the artifact proto + return True + + # _push_artifact_proto() + # + # Pushes the artifact proto to remote. + # + # Args: + # element (Element): The element + # artifact (Artifact): The related artifact being pushed + # remote (ArtifactRemote): Remote to push to + # + # Returns: + # (bool): Whether we pushed the artifact. + # + # Raises: + # ArtifactError: If the push fails for any reason except the + # artifact already existing. + # + def _push_artifact_proto(self, element, artifact, remote): + + artifact_proto = artifact._get_proto() + + keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + + # Check whether the artifact is on the server for key in keys: - update_artifact = artifact_pb2.UpdateArtifactRequest() - update_artifact.cache_key = element.get_artifact_name(key) - update_artifact.artifact.CopyFrom(artifact_proto) + try: + remote.get_artifact(element.get_artifact_name(key=key)) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Error checking artifact cache: {}" + .format(e.details())) + else: + return False + # If not, we send the artifact proto + for key in keys: try: - artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) - artifact_service.UpdateArtifact(update_artifact) + remote.update_artifact(element.get_artifact_name(key=key), artifact_proto) except grpc.RpcError as e: raise ArtifactError("Failed to push artifact: {}".format(e.details())) return True - # _pull_artifact() + # _pull_artifact_storage(): + # + # Pull artifact blobs from the given remote. # # Args: - # element (Element): element to pull - # key (str): specific key of element to pull - # remote (CASRemote): remote to pull from - # pull_buildtree (bool): whether to pull buildtrees or not + # element (Element): element to pull + # key (str): The specific key for the artifact to pull + # remote (CASRemote): remote to pull from + # pull_buildtree (bool): whether to pull buildtrees or not # # Returns: - # (bool): whether the pull was successful + # (bool): True if we pulled any blobs. # - def _pull_artifact(self, element, key, remote, pull_buildtrees=False): - + # Raises: + # ArtifactError: If the pull failed for any reason except the + # blobs not existing on the server. + # + def _pull_artifact_storage(self, element, artifact, remote, pull_buildtrees=False): def __pull_digest(digest): self.cas._fetch_directory(remote, digest) required_blobs = self.cas.required_blobs_for_directory(digest) @@ -497,16 +638,6 @@ class ArtifactCache(BaseCache): if missing_blobs: self.cas.fetch_blobs(remote, missing_blobs) - request = artifact_pb2.GetArtifactRequest() - request.cache_key = element.get_artifact_name(key=key) - try: - artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) - artifact = artifact_service.GetArtifact(request) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact: {}".format(e.details())) - return False - try: if str(artifact.files): __pull_digest(artifact.files) @@ -527,13 +658,41 @@ class ArtifactCache(BaseCache): raise ArtifactError("Failed to pull artifact: {}".format(e.details())) return False + return True + + # _pull_artifact_proto(): + # + # Pull an artifact proto from a remote server. + # + # Args: + # element (Element): The element whose artifact to pull. + # key (str): The specific key for the artifact to pull. + # remote (ArtifactRemote): The remote to pull from. + # + # Returns: + # (Artifact|None): The artifact proto, or None if the server + # doesn't have it. + # + # Raises: + # ArtifactError: If the pull fails. + # + def _pull_artifact_proto(self, element, key, remote): + artifact_name = element.get_artifact_name(key=key) + + try: + artifact = remote.get_artifact(artifact_name) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return None + # Write the artifact proto to cache - artifact_path = os.path.join(self.artifactdir, request.cache_key) + artifact_path = os.path.join(self.artifactdir, artifact_name) os.makedirs(os.path.dirname(artifact_path), exist_ok=True) with utils.save_file_atomic(artifact_path, mode='wb') as f: f.write(artifact.SerializeToString()) - return True + return artifact # _query_remote() # |