diff options
author | Chandan Singh <csingh43@bloomberg.net> | 2019-04-24 22:53:19 +0100 |
---|---|---|
committer | Chandan Singh <csingh43@bloomberg.net> | 2019-05-21 12:41:18 +0100 |
commit | 070d053e5cc47e572e9f9e647315082bd7a15c63 (patch) | |
tree | 7fb0fdff52f9b5f8a18ec8fe9c75b661f9e0839e /src/buildstream/_artifactcache.py | |
parent | 6c59e7901a52be961c2a1b671cf2b30f90bc4d0a (diff) | |
download | buildstream-070d053e5cc47e572e9f9e647315082bd7a15c63.tar.gz |
Move source from 'buildstream' to 'src/buildstream'
This was discussed in #1008.
Fixes #1009.
Diffstat (limited to 'src/buildstream/_artifactcache.py')
-rw-r--r-- | src/buildstream/_artifactcache.py | 617 |
1 files changed, 617 insertions, 0 deletions
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py new file mode 100644 index 000000000..091b44dda --- /dev/null +++ b/src/buildstream/_artifactcache.py @@ -0,0 +1,617 @@ +# +# Copyright (C) 2017-2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Maat <tristan.maat@codethink.co.uk> + +import os +import grpc + +from ._basecache import BaseCache +from .types import _KeyStrength +from ._exceptions import ArtifactError, CASError, CASCacheError +from ._protos.buildstream.v2 import artifact_pb2, artifact_pb2_grpc + +from ._cas import CASRemoteSpec +from .storage._casbaseddirectory import CasBasedDirectory +from ._artifact import Artifact +from . import utils + + +# An ArtifactCacheSpec holds the user configuration for a single remote +# artifact cache. +# +# Args: +# url (str): Location of the remote artifact cache +# push (bool): Whether we should attempt to push artifacts to this cache, +# in addition to pulling from it. +# +class ArtifactCacheSpec(CASRemoteSpec): + pass + + +# An ArtifactCache manages artifacts. +# +# Args: +# context (Context): The BuildStream context +# +class ArtifactCache(BaseCache): + + spec_class = ArtifactCacheSpec + spec_name = "artifact_cache_specs" + spec_error = ArtifactError + config_node_name = "artifacts" + + def __init__(self, context): + super().__init__(context) + + self._required_elements = set() # The elements required for this session + + # create artifact directory + self.artifactdir = context.artifactdir + os.makedirs(self.artifactdir, exist_ok=True) + + self.casquota.add_remove_callbacks(self.unrequired_artifacts, self.remove) + self.casquota.add_list_refs_callback(self.list_artifacts) + + self.cas.add_reachable_directories_callback(self._reachable_directories) + self.cas.add_reachable_digests_callback(self._reachable_digests) + + # mark_required_elements(): + # + # Mark elements whose artifacts are required for the current run. + # + # Artifacts whose elements are in this list will be locked by the artifact + # cache and not touched for the duration of the current pipeline. + # + # Args: + # elements (iterable): A set of elements to mark as required + # + def mark_required_elements(self, elements): + + # We risk calling this function with a generator, so we + # better consume it first. + # + elements = list(elements) + + # Mark the elements as required. We cannot know that we know the + # cache keys yet, so we only check that later when deleting. + # + self._required_elements.update(elements) + + # For the cache keys which were resolved so far, we bump + # the mtime of them. + # + # This is just in case we have concurrent instances of + # BuildStream running with the same artifact cache, it will + # reduce the likelyhood of one instance deleting artifacts + # which are required by the other. + for element in elements: + strong_key = element._get_cache_key(strength=_KeyStrength.STRONG) + weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) + for key in (strong_key, weak_key): + if key: + ref = element.get_artifact_name(key) + + try: + self.update_mtime(ref) + except ArtifactError: + pass + + def update_mtime(self, ref): + try: + os.utime(os.path.join(self.artifactdir, ref)) + except FileNotFoundError as e: + raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e + + # unrequired_artifacts() + # + # Returns iterator over artifacts that are not required in the build plan + # + # Returns: + # (iter): Iterator over tuples of (float, str) where float is the time + # and str is the artifact ref + # + def unrequired_artifacts(self): + required_artifacts = set(map(lambda x: x.get_artifact_name(), + self._required_elements)) + for (mtime, artifact) in self._list_refs_mtimes(self.artifactdir): + if artifact not in required_artifacts: + yield (mtime, artifact) + + def required_artifacts(self): + # Build a set of the cache keys which are required + # based on the required elements at cleanup time + # + # We lock both strong and weak keys - deleting one but not the + # other won't save space, but would be a user inconvenience. + for element in self._required_elements: + yield element._get_cache_key(strength=_KeyStrength.STRONG) + yield element._get_cache_key(strength=_KeyStrength.WEAK) + + def full(self): + return self.casquota.full() + + # add_artifact_size() + # + # Adds the reported size of a newly cached artifact to the + # overall estimated size. + # + # Args: + # artifact_size (int): The size to add. + # + def add_artifact_size(self, artifact_size): + cache_size = self.casquota.get_cache_size() + cache_size += artifact_size + + self.casquota.set_cache_size(cache_size) + + # preflight(): + # + # Preflight check. + # + def preflight(self): + self.cas.preflight() + + # contains(): + # + # Check whether the artifact for the specified Element is already available + # in the local artifact cache. + # + # Args: + # element (Element): The Element to check + # key (str): The cache key to use + # + # Returns: True if the artifact is in the cache, False otherwise + # + def contains(self, element, key): + ref = element.get_artifact_name(key) + + return os.path.exists(os.path.join(self.artifactdir, ref)) + + # list_artifacts(): + # + # List artifacts in this cache in LRU order. + # + # Args: + # glob (str): An option glob expression to be used to list artifacts satisfying the glob + # + # Returns: + # ([str]) - A list of artifact names as generated in LRU order + # + def list_artifacts(self, *, glob=None): + return [ref for _, ref in sorted(list(self._list_refs_mtimes(self.artifactdir, glob_expr=glob)))] + + # remove(): + # + # Removes the artifact for the specified ref from the local + # artifact cache. + # + # Args: + # ref (artifact_name): The name of the artifact to remove (as + # generated by `Element.get_artifact_name`) + # defer_prune (bool): Optionally declare whether pruning should + # occur immediately after the ref is removed. + # + # Returns: + # (int): The amount of space recovered in the cache, in bytes + # + def remove(self, ref, *, defer_prune=False): + try: + return self.cas.remove(ref, basedir=self.artifactdir, defer_prune=defer_prune) + except CASCacheError as e: + raise ArtifactError("{}".format(e)) from e + + # prune(): + # + # Prune the artifact cache of unreachable refs + # + def prune(self): + return self.cas.prune() + + # diff(): + # + # Return a list of files that have been added or modified between + # the artifacts described by key_a and key_b. This expects the + # provided keys to be strong cache keys + # + # Args: + # element (Element): The element whose artifacts to compare + # key_a (str): The first artifact strong key + # key_b (str): The second artifact strong key + # + def diff(self, element, key_a, key_b): + context = self.context + artifact_a = Artifact(element, context, strong_key=key_a) + artifact_b = Artifact(element, context, strong_key=key_b) + digest_a = artifact_a._get_proto().files + digest_b = artifact_b._get_proto().files + + added = [] + removed = [] + modified = [] + + self.cas.diff_trees(digest_a, digest_b, added=added, removed=removed, modified=modified) + + return modified, removed, added + + # push(): + # + # Push committed artifact to remote repository. + # + # Args: + # element (Element): The Element whose artifact is to be pushed + # artifact (Artifact): The artifact being pushed + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (ArtifactError): if there was an error + # + def push(self, element, artifact): + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + + if self._push_artifact(element, artifact, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + element.info("Remote ({}) already has artifact {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + return pushed + + # pull(): + # + # Pull artifact from one of the configured remote repositories. + # + # Args: + # element (Element): The Element whose artifact is to be fetched + # key (str): The cache key to use + # pull_buildtrees (bool): Whether to pull buildtrees or not + # + # Returns: + # (bool): True if pull was successful, False if artifact was not available + # + def pull(self, element, key, *, pull_buildtrees=False): + display_key = key[:self.context.log_key_length] + project = element._get_project() + + for remote in self._remotes[project]: + remote.init() + try: + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) + + if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees): + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + element.info("Remote ({}) does not have artifact {} cached".format( + remote.spec.url, display_key + )) + + except CASError as e: + raise ArtifactError("Failed to pull artifact {}: {}".format( + display_key, e)) from e + + return False + + # pull_tree(): + # + # Pull a single Tree rather than an artifact. + # Does not update local refs. + # + # Args: + # project (Project): The current project + # digest (Digest): The digest of the tree + # + def pull_tree(self, project, digest): + for remote in self._remotes[project]: + digest = self.cas.pull_tree(remote, digest) + + if digest: + # no need to pull from additional remotes + return digest + + return None + + # push_message(): + # + # Push the given protobuf message to all remotes. + # + # Args: + # project (Project): The current project + # message (Message): A protobuf message to push. + # + # Raises: + # (ArtifactError): if there was an error + # + def push_message(self, project, message): + + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("push_message was called, but no remote artifact " + + "servers are configured as push remotes.") + + for remote in push_remotes: + message_digest = remote.push_message(message) + + return message_digest + + # link_key(): + # + # Add a key for an existing artifact. + # + # Args: + # element (Element): The Element whose artifact is to be linked + # oldkey (str): An existing cache key for the artifact + # newkey (str): A new cache key for the artifact + # + def link_key(self, element, oldkey, newkey): + oldref = element.get_artifact_name(oldkey) + newref = element.get_artifact_name(newkey) + + if not os.path.exists(os.path.join(self.artifactdir, newref)): + os.link(os.path.join(self.artifactdir, oldref), + os.path.join(self.artifactdir, newref)) + + # get_artifact_logs(): + # + # Get the logs of an existing artifact + # + # Args: + # ref (str): The ref of the artifact + # + # Returns: + # logsdir (CasBasedDirectory): A CasBasedDirectory containing the artifact's logs + # + def get_artifact_logs(self, ref): + cache_id = self.cas.resolve_ref(ref, update_mtime=True) + vdir = CasBasedDirectory(self.cas, digest=cache_id).descend('logs') + return vdir + + # fetch_missing_blobs(): + # + # Fetch missing blobs from configured remote repositories. + # + # Args: + # project (Project): The current project + # missing_blobs (list): The Digests of the blobs to fetch + # + def fetch_missing_blobs(self, project, missing_blobs): + for remote in self._remotes[project]: + if not missing_blobs: + break + + remote.init() + + # fetch_blobs() will return the blobs that are still missing + missing_blobs = self.cas.fetch_blobs(remote, missing_blobs) + + if missing_blobs: + raise ArtifactError("Blobs not found on configured artifact servers") + + # find_missing_blobs(): + # + # Find missing blobs from configured push remote repositories. + # + # Args: + # project (Project): The current project + # missing_blobs (list): The Digests of the blobs to check + # + # Returns: + # (list): The Digests of the blobs missing on at least one push remote + # + def find_missing_blobs(self, project, missing_blobs): + if not missing_blobs: + return [] + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + remote_missing_blobs_set = set() + + for remote in push_remotes: + remote.init() + + remote_missing_blobs = self.cas.remote_missing_blobs(remote, missing_blobs) + remote_missing_blobs_set.update(remote_missing_blobs) + + return list(remote_missing_blobs_set) + + ################################################ + # Local Private Methods # + ################################################ + + # _reachable_directories() + # + # Returns: + # (iter): Iterator over directories digests available from artifacts. + # + def _reachable_directories(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.files): + yield artifact.files + + if str(artifact.buildtree): + yield artifact.buildtree + + # _reachable_digests() + # + # Returns: + # (iter): Iterator over single file digests in artifacts + # + def _reachable_digests(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.public_data): + yield artifact.public_data + + for log_file in artifact.logs: + yield log_file.digest + + # _push_artifact() + # + # Pushes relevant directories and then artifact proto to remote. + # + # Args: + # element (Element): The element + # artifact (Artifact): The related artifact being pushed + # remote (CASRemote): Remote to push to + # + # Returns: + # (bool): whether the push was successful + # + def _push_artifact(self, element, artifact, remote): + + artifact_proto = artifact._get_proto() + + keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + + # Check whether the artifact is on the server + present = False + for key in keys: + get_artifact = artifact_pb2.GetArtifactRequest() + get_artifact.cache_key = element.get_artifact_name(key) + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact_service.GetArtifact(get_artifact) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Error checking artifact cache: {}" + .format(e.details())) + else: + present = True + if present: + return False + + try: + self.cas._send_directory(remote, artifact_proto.files) + + if str(artifact_proto.buildtree): + try: + self.cas._send_directory(remote, artifact_proto.buildtree) + except FileNotFoundError: + pass + + digests = [] + if str(artifact_proto.public_data): + digests.append(artifact_proto.public_data) + + for log_file in artifact_proto.logs: + digests.append(log_file.digest) + + self.cas.send_blobs(remote, digests) + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise ArtifactError("Failed to push artifact blobs: {}".format(e.details())) + return False + + # finally need to send the artifact proto + for key in keys: + update_artifact = artifact_pb2.UpdateArtifactRequest() + update_artifact.cache_key = element.get_artifact_name(key) + update_artifact.artifact.CopyFrom(artifact_proto) + + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact_service.UpdateArtifact(update_artifact) + except grpc.RpcError as e: + raise ArtifactError("Failed to push artifact: {}".format(e.details())) + + return True + + # _pull_artifact() + # + # Args: + # element (Element): element to pull + # key (str): specific key of element to pull + # remote (CASRemote): remote to pull from + # pull_buildtree (bool): whether to pull buildtrees or not + # + # Returns: + # (bool): whether the pull was successful + # + def _pull_artifact(self, element, key, remote, pull_buildtrees=False): + + def __pull_digest(digest): + self.cas._fetch_directory(remote, digest) + required_blobs = self.cas.required_blobs_for_directory(digest) + missing_blobs = self.cas.local_missing_blobs(required_blobs) + if missing_blobs: + self.cas.fetch_blobs(remote, missing_blobs) + + request = artifact_pb2.GetArtifactRequest() + request.cache_key = element.get_artifact_name(key=key) + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact = artifact_service.GetArtifact(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return False + + try: + if str(artifact.files): + __pull_digest(artifact.files) + + if pull_buildtrees and str(artifact.buildtree): + __pull_digest(artifact.buildtree) + + digests = [] + if str(artifact.public_data): + digests.append(artifact.public_data) + + for log_digest in artifact.logs: + digests.append(log_digest.digest) + + self.cas.fetch_blobs(remote, digests) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return False + + # Write the artifact proto to cache + artifact_path = os.path.join(self.artifactdir, request.cache_key) + os.makedirs(os.path.dirname(artifact_path), exist_ok=True) + with open(artifact_path, 'w+b') as f: + f.write(artifact.SerializeToString()) + + return True |