diff options
Diffstat (limited to 'src/buildstream/_elementsourcescache.py')
-rw-r--r-- | src/buildstream/_elementsourcescache.py | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/src/buildstream/_elementsourcescache.py b/src/buildstream/_elementsourcescache.py new file mode 100644 index 000000000..84e7633e5 --- /dev/null +++ b/src/buildstream/_elementsourcescache.py @@ -0,0 +1,337 @@ +# +# Copyright (C) 2019-2020 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +import os +import grpc + +from ._cas.casremote import BlobNotFound +from ._assetcache import AssetCache +from ._exceptions import AssetCacheError, CASError, CASRemoteError, SourceCacheError +from . import utils +from ._protos.buildstream.v2 import source_pb2 + +REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}" + + +# Class that keeps config of remotes and deals with caching of sources. +# +# Args: +# context (Context): The Buildstream context +# +class ElementSourcesCache(AssetCache): + + spec_name = "source_cache_specs" + config_node_name = "source-caches" + + def __init__(self, context): + super().__init__(context) + + self._basedir = os.path.join(context.cachedir, "elementsources") + os.makedirs(self._basedir, exist_ok=True) + + # load_proto(): + # + # Load source proto from local cache. + # + # Args: + # sources (ElementSources): The sources whose proto we want to load + # + def load_proto(self, sources): + ref = sources.get_cache_key() + path = self._source_path(ref) + + if not os.path.exists(path): + return None + + source_proto = source_pb2.Source() + with open(path, "r+b") as f: + source_proto.ParseFromString(f.read()) + return source_proto + + def store_proto(self, sources, proto): + ref = sources.get_cache_key() + path = self._source_path(ref) + + with utils.save_file_atomic(path, "w+b") as f: + f.write(proto.SerializeToString()) + + # pull(): + # + # Attempts to pull sources from configured remote source caches. + # + # Args: + # sources (ElementSources): The sources we want to fetch + # + # Returns: + # (bool): True if pull successful, False if not + # + def pull(self, sources, plugin): + project = sources.get_project() + + ref = sources.get_cache_key() + display_key = sources.get_brief_display_key() + + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref) + + source_digest = None + errors = [] + # Start by pulling our source proto, so that we know which + # blobs to pull + for remote in self._index_remotes[project]: + remote.init() + try: + plugin.status("Pulling source {} <- {}".format(display_key, remote)) + response = remote.fetch_blob([uri]) + if response: + source_digest = response.blob_digest + break + + plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key)) + except AssetCacheError as e: + plugin.warn("Could not pull from remote {}: {}".format(remote, e)) + errors.append(e) + + if errors and not source_digest: + raise SourceCacheError( + "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors) + ) + + # If we don't have a source proto, we can't pull source files + if not source_digest: + return False + + errors = [] + for remote in self._storage_remotes[project]: + remote.init() + try: + plugin.status("Pulling data for source {} <- {}".format(display_key, remote)) + + if self._pull_source_storage(ref, source_digest, remote): + plugin.info("Pulled source {} <- {}".format(display_key, remote)) + return True + + plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key)) + except BlobNotFound as e: + # Not all blobs are available on this remote + plugin.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob)) + continue + except CASError as e: + plugin.warn("Could not pull from remote {}: {}".format(remote, e)) + errors.append(e) + + if errors: + raise SourceCacheError( + "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors) + ) + + return False + + # push(): + # + # Push sources to remote repository. + # + # Args: + # sources (ElementSources): The sources to be pushed + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (SourceCacheError): if there was an error + # + def push(self, sources, plugin): + project = sources.get_project() + + ref = sources.get_cache_key() + display_key = sources.get_brief_display_key() + + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref) + + index_remotes = [r for r in self._index_remotes[project] if r.push] + storage_remotes = [r for r in self._storage_remotes[project] if r.push] + + source_proto = self.load_proto(sources) + source_digest = self.cas.add_object(buffer=source_proto.SerializeToString()) + + pushed = False + + # First push our files to all storage remotes, so that they + # can perform file checks on their end + for remote in storage_remotes: + remote.init() + plugin.status("Pushing data from source {} -> {}".format(display_key, remote)) + + if self._push_source_blobs(source_proto, source_digest, remote): + plugin.info("Pushed data from source {} -> {}".format(display_key, remote)) + else: + plugin.info("Remote ({}) already has all data of source {} cached".format(remote, display_key())) + + for remote in index_remotes: + remote.init() + plugin.status("Pushing source {} -> {}".format(display_key, remote)) + + if self._push_source_proto(uri, source_proto, source_digest, remote): + plugin.info("Pushed source {} -> {}".format(display_key, remote)) + pushed = True + else: + plugin.info("Remote ({}) already has source {} cached".format(remote, display_key)) + + return pushed + + def _get_source(self, ref): + path = self._source_path(ref) + source_proto = source_pb2.Source() + try: + with open(path, "r+b") as f: + source_proto.ParseFromString(f.read()) + return source_proto + except FileNotFoundError as e: + raise SourceCacheError("Attempted to access unavailable source: {}".format(e)) from e + + def _source_path(self, ref): + return os.path.join(self._basedir, ref) + + # _push_source_blobs() + # + # Push the blobs that make up an source to the remote server. + # + # Args: + # source_proto: The source proto whose blobs to push. + # source_digest: The digest of the source proto. + # remote (CASRemote): The remote to push the blobs to. + # + # Returns: + # (bool) - True if we uploaded anything, False otherwise. + # + # Raises: + # SourceCacheError: If we fail to push blobs (*unless* they're + # already there or we run out of space on the server). + # + def _push_source_blobs(self, source_proto, source_digest, remote): + try: + # Push source files + self.cas._send_directory(remote, source_proto.files) + # Push source proto + self.cas.send_blobs(remote, [source_digest]) + + except CASRemoteError as cas_error: + if cas_error.reason != "cache-too-full": + raise SourceCacheError("Failed to push source blobs: {}".format(cas_error)) + return False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise SourceCacheError( + "Failed to push source blobs with status {}: {}".format(e.code().name, e.details()) + ) + return False + + return True + + # _push_source_proto() + # + # Pushes the source proto to remote. + # + # Args: + # source_proto: The source proto. + # source_digest: The digest of the source proto. + # remote (AssetRemote): Remote to push to + # + # Returns: + # (bool): Whether we pushed the source. + # + # Raises: + # SourceCacheError: If the push fails for any reason except the + # source already existing. + # + def _push_source_proto(self, uri, source_proto, source_digest, remote): + try: + response = remote.fetch_blob([uri]) + # Skip push if source is already on the server + if response and response.blob_digest == source_digest: + return False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SourceCacheError( + "Error checking source cache with status {}: {}".format(e.code().name, e.details()) + ) + + referenced_directories = [source_proto.files] + + try: + remote.push_blob( + [uri], source_digest, references_directories=referenced_directories, + ) + except grpc.RpcError as e: + raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details())) + + return True + + # _pull_source_storage(): + # + # Pull source blobs from the given remote. + # + # Args: + # key (str): The specific key for the source to pull + # remote (CASRemote): remote to pull from + # + # Returns: + # (bool): True if we pulled any blobs. + # + # Raises: + # SourceCacheError: If the pull failed for any reason except the + # blobs not existing on the server. + # + def _pull_source_storage(self, key, source_digest, remote): + def __pull_digest(digest): + self.cas._fetch_directory(remote, digest) + required_blobs = self.cas.required_blobs_for_directory(digest) + missing_blobs = self.cas.local_missing_blobs(required_blobs) + if missing_blobs: + self.cas.fetch_blobs(remote, missing_blobs) + + try: + # Fetch and parse source proto + self.cas.fetch_blobs(remote, [source_digest]) + source = source_pb2.Source() + with open(self.cas.objpath(source_digest), "rb") as f: + source.ParseFromString(f.read()) + + # Write the source proto to cache + source_path = os.path.join(self._basedir, key) + with utils.save_file_atomic(source_path, mode="wb") as f: + f.write(source.SerializeToString()) + + __pull_digest(source.files) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SourceCacheError("Failed to pull source with status {}: {}".format(e.code().name, e.details())) + return False + + return True + + def _push_source(self, source_ref, remote): + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) + + try: + remote.init() + source_proto = self._get_source(source_ref) + remote.push_directory([uri], source_proto.files) + return True + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details())) + return False |