diff options
Diffstat (limited to 'buildstream/_artifactcache.py')
-rw-r--r-- | buildstream/_artifactcache.py | 336 |
1 files changed, 244 insertions, 92 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index 5b0ccacc4..091b44dda 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -17,13 +17,18 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import os +import grpc + from ._basecache import BaseCache from .types import _KeyStrength -from ._exceptions import ArtifactError, CASCacheError, CASError +from ._exceptions import ArtifactError, CASError, CASCacheError +from ._protos.buildstream.v2 import artifact_pb2, artifact_pb2_grpc from ._cas import CASRemoteSpec from .storage._casbaseddirectory import CasBasedDirectory -from .storage.directory import VirtualDirectoryError +from ._artifact import Artifact +from . import utils # An ArtifactCacheSpec holds the user configuration for a single remote @@ -55,8 +60,15 @@ class ArtifactCache(BaseCache): self._required_elements = set() # The elements required for this session - self.casquota.add_ref_callbacks(self.required_artifacts) - self.casquota.add_remove_callbacks((lambda x: not x.startswith('@'), self.remove)) + # create artifact directory + self.artifactdir = context.artifactdir + os.makedirs(self.artifactdir, exist_ok=True) + + self.casquota.add_remove_callbacks(self.unrequired_artifacts, self.remove) + self.casquota.add_list_refs_callback(self.list_artifacts) + + self.cas.add_reachable_directories_callback(self._reachable_directories) + self.cas.add_reachable_digests_callback(self._reachable_digests) # mark_required_elements(): # @@ -92,13 +104,34 @@ class ArtifactCache(BaseCache): weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) for key in (strong_key, weak_key): if key: - try: - ref = element.get_artifact_name(key) + ref = element.get_artifact_name(key) - self.cas.update_mtime(ref) - except CASError: + try: + self.update_mtime(ref) + except ArtifactError: pass + def update_mtime(self, ref): + try: + os.utime(os.path.join(self.artifactdir, ref)) + except FileNotFoundError as e: + raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e + + # unrequired_artifacts() + # + # Returns iterator over artifacts that are not required in the build plan + # + # Returns: + # (iter): Iterator over tuples of (float, str) where float is the time + # and str is the artifact ref + # + def unrequired_artifacts(self): + required_artifacts = set(map(lambda x: x.get_artifact_name(), + self._required_elements)) + for (mtime, artifact) in self._list_refs_mtimes(self.artifactdir): + if artifact not in required_artifacts: + yield (mtime, artifact) + def required_artifacts(self): # Build a set of the cache keys which are required # based on the required elements at cleanup time @@ -147,24 +180,7 @@ class ArtifactCache(BaseCache): def contains(self, element, key): ref = element.get_artifact_name(key) - return self.cas.contains(ref) - - # contains_subdir_artifact(): - # - # Check whether an artifact element contains a digest for a subdir - # which is populated in the cache, i.e non dangling. - # - # Args: - # element (Element): The Element to check - # key (str): The cache key to use - # subdir (str): The subdir to check - # with_files (bool): Whether to check files as well - # - # Returns: True if the subdir exists & is populated in the cache, False otherwise - # - def contains_subdir_artifact(self, element, key, subdir, *, with_files=True): - ref = element.get_artifact_name(key) - return self.cas.contains_subdir_artifact(ref, subdir, with_files=with_files) + return os.path.exists(os.path.join(self.artifactdir, ref)) # list_artifacts(): # @@ -177,9 +193,7 @@ class ArtifactCache(BaseCache): # ([str]) - A list of artifact names as generated in LRU order # def list_artifacts(self, *, glob=None): - return list(filter( - lambda x: not x.startswith('@'), - self.cas.list_refs(glob=glob))) + return [ref for _, ref in sorted(list(self._list_refs_mtimes(self.artifactdir, glob_expr=glob)))] # remove(): # @@ -196,7 +210,10 @@ class ArtifactCache(BaseCache): # (int): The amount of space recovered in the cache, in bytes # def remove(self, ref, *, defer_prune=False): - return self.cas.remove(ref, defer_prune=defer_prune) + try: + return self.cas.remove(ref, basedir=self.artifactdir, defer_prune=defer_prune) + except CASCacheError as e: + raise ArtifactError("{}".format(e)) from e # prune(): # @@ -205,63 +222,31 @@ class ArtifactCache(BaseCache): def prune(self): return self.cas.prune() - # get_artifact_directory(): - # - # Get virtual directory for cached artifact of the specified Element. - # - # Assumes artifact has previously been fetched or committed. - # - # Args: - # element (Element): The Element to extract - # key (str): The cache key to use - # - # Raises: - # ArtifactError: In cases there was an OSError, or if the artifact - # did not exist. - # - # Returns: virtual directory object - # - def get_artifact_directory(self, element, key): - ref = element.get_artifact_name(key) - try: - digest = self.cas.resolve_ref(ref, update_mtime=True) - return CasBasedDirectory(self.cas, digest=digest) - except (CASCacheError, VirtualDirectoryError) as e: - raise ArtifactError('Directory not in local cache: {}'.format(e)) from e - - # commit(): - # - # Commit built artifact to cache. - # - # Args: - # element (Element): The Element commit an artifact for - # content (Directory): The element's content directory - # keys (list): The cache keys to use - # - def commit(self, element, content, keys): - refs = [element.get_artifact_name(key) for key in keys] - - tree = content._get_digest() - - for ref in refs: - self.cas.set_ref(ref, tree) - # diff(): # # Return a list of files that have been added or modified between - # the artifacts described by key_a and key_b. + # the artifacts described by key_a and key_b. This expects the + # provided keys to be strong cache keys # # Args: # element (Element): The element whose artifacts to compare - # key_a (str): The first artifact key - # key_b (str): The second artifact key - # subdir (str): A subdirectory to limit the comparison to + # key_a (str): The first artifact strong key + # key_b (str): The second artifact strong key # - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = element.get_artifact_name(key_a) - ref_b = element.get_artifact_name(key_b) + def diff(self, element, key_a, key_b): + context = self.context + artifact_a = Artifact(element, context, strong_key=key_a) + artifact_b = Artifact(element, context, strong_key=key_b) + digest_a = artifact_a._get_proto().files + digest_b = artifact_b._get_proto().files - return self.cas.diff(ref_a, ref_b, subdir=subdir) + added = [] + removed = [] + modified = [] + + self.cas.diff_trees(digest_a, digest_b, added=added, removed=removed, modified=modified) + + return modified, removed, added # push(): # @@ -269,7 +254,7 @@ class ArtifactCache(BaseCache): # # Args: # element (Element): The Element whose artifact is to be pushed - # keys (list): The cache keys to use + # artifact (Artifact): The artifact being pushed # # Returns: # (bool): True if any remote was updated, False if no pushes were required @@ -277,9 +262,7 @@ class ArtifactCache(BaseCache): # Raises: # (ArtifactError): if there was an error # - def push(self, element, keys): - refs = [element.get_artifact_name(key) for key in list(keys)] - + def push(self, element, artifact): project = element._get_project() push_remotes = [r for r in self._remotes[project] if r.spec.push] @@ -291,7 +274,7 @@ class ArtifactCache(BaseCache): display_key = element._get_brief_display_key() element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - if self.cas.push(refs, remote): + if self._push_artifact(element, artifact, remote): element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) pushed = True else: @@ -308,24 +291,21 @@ class ArtifactCache(BaseCache): # Args: # element (Element): The Element whose artifact is to be fetched # key (str): The cache key to use - # progress (callable): The progress callback, if any - # subdir (str): The optional specific subdir to pull - # excluded_subdirs (list): The optional list of subdirs to not pull + # pull_buildtrees (bool): Whether to pull buildtrees or not # # Returns: # (bool): True if pull was successful, False if artifact was not available # - def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None): - ref = element.get_artifact_name(key) + def pull(self, element, key, *, pull_buildtrees=False): display_key = key[:self.context.log_key_length] - project = element._get_project() for remote in self._remotes[project]: + remote.init() try: element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs): + if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees): element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) # no need to pull from additional remotes return True @@ -399,7 +379,9 @@ class ArtifactCache(BaseCache): oldref = element.get_artifact_name(oldkey) newref = element.get_artifact_name(newkey) - self.cas.link_ref(oldref, newref) + if not os.path.exists(os.path.join(self.artifactdir, newref)): + os.link(os.path.join(self.artifactdir, oldref), + os.path.join(self.artifactdir, newref)) # get_artifact_logs(): # @@ -463,3 +445,173 @@ class ArtifactCache(BaseCache): remote_missing_blobs_set.update(remote_missing_blobs) return list(remote_missing_blobs_set) + + ################################################ + # Local Private Methods # + ################################################ + + # _reachable_directories() + # + # Returns: + # (iter): Iterator over directories digests available from artifacts. + # + def _reachable_directories(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.files): + yield artifact.files + + if str(artifact.buildtree): + yield artifact.buildtree + + # _reachable_digests() + # + # Returns: + # (iter): Iterator over single file digests in artifacts + # + def _reachable_digests(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.public_data): + yield artifact.public_data + + for log_file in artifact.logs: + yield log_file.digest + + # _push_artifact() + # + # Pushes relevant directories and then artifact proto to remote. + # + # Args: + # element (Element): The element + # artifact (Artifact): The related artifact being pushed + # remote (CASRemote): Remote to push to + # + # Returns: + # (bool): whether the push was successful + # + def _push_artifact(self, element, artifact, remote): + + artifact_proto = artifact._get_proto() + + keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + + # Check whether the artifact is on the server + present = False + for key in keys: + get_artifact = artifact_pb2.GetArtifactRequest() + get_artifact.cache_key = element.get_artifact_name(key) + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact_service.GetArtifact(get_artifact) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Error checking artifact cache: {}" + .format(e.details())) + else: + present = True + if present: + return False + + try: + self.cas._send_directory(remote, artifact_proto.files) + + if str(artifact_proto.buildtree): + try: + self.cas._send_directory(remote, artifact_proto.buildtree) + except FileNotFoundError: + pass + + digests = [] + if str(artifact_proto.public_data): + digests.append(artifact_proto.public_data) + + for log_file in artifact_proto.logs: + digests.append(log_file.digest) + + self.cas.send_blobs(remote, digests) + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise ArtifactError("Failed to push artifact blobs: {}".format(e.details())) + return False + + # finally need to send the artifact proto + for key in keys: + update_artifact = artifact_pb2.UpdateArtifactRequest() + update_artifact.cache_key = element.get_artifact_name(key) + update_artifact.artifact.CopyFrom(artifact_proto) + + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact_service.UpdateArtifact(update_artifact) + except grpc.RpcError as e: + raise ArtifactError("Failed to push artifact: {}".format(e.details())) + + return True + + # _pull_artifact() + # + # Args: + # element (Element): element to pull + # key (str): specific key of element to pull + # remote (CASRemote): remote to pull from + # pull_buildtree (bool): whether to pull buildtrees or not + # + # Returns: + # (bool): whether the pull was successful + # + def _pull_artifact(self, element, key, remote, pull_buildtrees=False): + + def __pull_digest(digest): + self.cas._fetch_directory(remote, digest) + required_blobs = self.cas.required_blobs_for_directory(digest) + missing_blobs = self.cas.local_missing_blobs(required_blobs) + if missing_blobs: + self.cas.fetch_blobs(remote, missing_blobs) + + request = artifact_pb2.GetArtifactRequest() + request.cache_key = element.get_artifact_name(key=key) + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact = artifact_service.GetArtifact(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return False + + try: + if str(artifact.files): + __pull_digest(artifact.files) + + if pull_buildtrees and str(artifact.buildtree): + __pull_digest(artifact.buildtree) + + digests = [] + if str(artifact.public_data): + digests.append(artifact.public_data) + + for log_digest in artifact.logs: + digests.append(log_digest.digest) + + self.cas.fetch_blobs(remote, digests) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return False + + # Write the artifact proto to cache + artifact_path = os.path.join(self.artifactdir, request.cache_key) + os.makedirs(os.path.dirname(artifact_path), exist_ok=True) + with open(artifact_path, 'w+b') as f: + f.write(artifact.SerializeToString()) + + return True |