summaryrefslogtreecommitdiff
path: root/src/buildstream/_elementsourcescache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_elementsourcescache.py')
-rw-r--r--src/buildstream/_elementsourcescache.py337
1 files changed, 337 insertions, 0 deletions
diff --git a/src/buildstream/_elementsourcescache.py b/src/buildstream/_elementsourcescache.py
new file mode 100644
index 000000000..84e7633e5
--- /dev/null
+++ b/src/buildstream/_elementsourcescache.py
@@ -0,0 +1,337 @@
+#
+# Copyright (C) 2019-2020 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+import os
+import grpc
+
+from ._cas.casremote import BlobNotFound
+from ._assetcache import AssetCache
+from ._exceptions import AssetCacheError, CASError, CASRemoteError, SourceCacheError
+from . import utils
+from ._protos.buildstream.v2 import source_pb2
+
+REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}"
+
+
+# Class that keeps config of remotes and deals with caching of sources.
+#
+# Args:
+# context (Context): The Buildstream context
+#
+class ElementSourcesCache(AssetCache):
+
+ spec_name = "source_cache_specs"
+ config_node_name = "source-caches"
+
+ def __init__(self, context):
+ super().__init__(context)
+
+ self._basedir = os.path.join(context.cachedir, "elementsources")
+ os.makedirs(self._basedir, exist_ok=True)
+
+ # load_proto():
+ #
+ # Load source proto from local cache.
+ #
+ # Args:
+ # sources (ElementSources): The sources whose proto we want to load
+ #
+ def load_proto(self, sources):
+ ref = sources.get_cache_key()
+ path = self._source_path(ref)
+
+ if not os.path.exists(path):
+ return None
+
+ source_proto = source_pb2.Source()
+ with open(path, "r+b") as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+
+ def store_proto(self, sources, proto):
+ ref = sources.get_cache_key()
+ path = self._source_path(ref)
+
+ with utils.save_file_atomic(path, "w+b") as f:
+ f.write(proto.SerializeToString())
+
+ # pull():
+ #
+ # Attempts to pull sources from configured remote source caches.
+ #
+ # Args:
+ # sources (ElementSources): The sources we want to fetch
+ #
+ # Returns:
+ # (bool): True if pull successful, False if not
+ #
+ def pull(self, sources, plugin):
+ project = sources.get_project()
+
+ ref = sources.get_cache_key()
+ display_key = sources.get_brief_display_key()
+
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref)
+
+ source_digest = None
+ errors = []
+ # Start by pulling our source proto, so that we know which
+ # blobs to pull
+ for remote in self._index_remotes[project]:
+ remote.init()
+ try:
+ plugin.status("Pulling source {} <- {}".format(display_key, remote))
+ response = remote.fetch_blob([uri])
+ if response:
+ source_digest = response.blob_digest
+ break
+
+ plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key))
+ except AssetCacheError as e:
+ plugin.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors and not source_digest:
+ raise SourceCacheError(
+ "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors)
+ )
+
+ # If we don't have a source proto, we can't pull source files
+ if not source_digest:
+ return False
+
+ errors = []
+ for remote in self._storage_remotes[project]:
+ remote.init()
+ try:
+ plugin.status("Pulling data for source {} <- {}".format(display_key, remote))
+
+ if self._pull_source_storage(ref, source_digest, remote):
+ plugin.info("Pulled source {} <- {}".format(display_key, remote))
+ return True
+
+ plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key))
+ except BlobNotFound as e:
+ # Not all blobs are available on this remote
+ plugin.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob))
+ continue
+ except CASError as e:
+ plugin.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors:
+ raise SourceCacheError(
+ "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors)
+ )
+
+ return False
+
+ # push():
+ #
+ # Push sources to remote repository.
+ #
+ # Args:
+ # sources (ElementSources): The sources to be pushed
+ #
+ # Returns:
+ # (bool): True if any remote was updated, False if no pushes were required
+ #
+ # Raises:
+ # (SourceCacheError): if there was an error
+ #
+ def push(self, sources, plugin):
+ project = sources.get_project()
+
+ ref = sources.get_cache_key()
+ display_key = sources.get_brief_display_key()
+
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref)
+
+ index_remotes = [r for r in self._index_remotes[project] if r.push]
+ storage_remotes = [r for r in self._storage_remotes[project] if r.push]
+
+ source_proto = self.load_proto(sources)
+ source_digest = self.cas.add_object(buffer=source_proto.SerializeToString())
+
+ pushed = False
+
+ # First push our files to all storage remotes, so that they
+ # can perform file checks on their end
+ for remote in storage_remotes:
+ remote.init()
+ plugin.status("Pushing data from source {} -> {}".format(display_key, remote))
+
+ if self._push_source_blobs(source_proto, source_digest, remote):
+ plugin.info("Pushed data from source {} -> {}".format(display_key, remote))
+ else:
+ plugin.info("Remote ({}) already has all data of source {} cached".format(remote, display_key()))
+
+ for remote in index_remotes:
+ remote.init()
+ plugin.status("Pushing source {} -> {}".format(display_key, remote))
+
+ if self._push_source_proto(uri, source_proto, source_digest, remote):
+ plugin.info("Pushed source {} -> {}".format(display_key, remote))
+ pushed = True
+ else:
+ plugin.info("Remote ({}) already has source {} cached".format(remote, display_key))
+
+ return pushed
+
+ def _get_source(self, ref):
+ path = self._source_path(ref)
+ source_proto = source_pb2.Source()
+ try:
+ with open(path, "r+b") as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+ except FileNotFoundError as e:
+ raise SourceCacheError("Attempted to access unavailable source: {}".format(e)) from e
+
+ def _source_path(self, ref):
+ return os.path.join(self._basedir, ref)
+
+ # _push_source_blobs()
+ #
+ # Push the blobs that make up an source to the remote server.
+ #
+ # Args:
+ # source_proto: The source proto whose blobs to push.
+ # source_digest: The digest of the source proto.
+ # remote (CASRemote): The remote to push the blobs to.
+ #
+ # Returns:
+ # (bool) - True if we uploaded anything, False otherwise.
+ #
+ # Raises:
+ # SourceCacheError: If we fail to push blobs (*unless* they're
+ # already there or we run out of space on the server).
+ #
+ def _push_source_blobs(self, source_proto, source_digest, remote):
+ try:
+ # Push source files
+ self.cas._send_directory(remote, source_proto.files)
+ # Push source proto
+ self.cas.send_blobs(remote, [source_digest])
+
+ except CASRemoteError as cas_error:
+ if cas_error.reason != "cache-too-full":
+ raise SourceCacheError("Failed to push source blobs: {}".format(cas_error))
+ return False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError(
+ "Failed to push source blobs with status {}: {}".format(e.code().name, e.details())
+ )
+ return False
+
+ return True
+
+ # _push_source_proto()
+ #
+ # Pushes the source proto to remote.
+ #
+ # Args:
+ # source_proto: The source proto.
+ # source_digest: The digest of the source proto.
+ # remote (AssetRemote): Remote to push to
+ #
+ # Returns:
+ # (bool): Whether we pushed the source.
+ #
+ # Raises:
+ # SourceCacheError: If the push fails for any reason except the
+ # source already existing.
+ #
+ def _push_source_proto(self, uri, source_proto, source_digest, remote):
+ try:
+ response = remote.fetch_blob([uri])
+ # Skip push if source is already on the server
+ if response and response.blob_digest == source_digest:
+ return False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError(
+ "Error checking source cache with status {}: {}".format(e.code().name, e.details())
+ )
+
+ referenced_directories = [source_proto.files]
+
+ try:
+ remote.push_blob(
+ [uri], source_digest, references_directories=referenced_directories,
+ )
+ except grpc.RpcError as e:
+ raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details()))
+
+ return True
+
+ # _pull_source_storage():
+ #
+ # Pull source blobs from the given remote.
+ #
+ # Args:
+ # key (str): The specific key for the source to pull
+ # remote (CASRemote): remote to pull from
+ #
+ # Returns:
+ # (bool): True if we pulled any blobs.
+ #
+ # Raises:
+ # SourceCacheError: If the pull failed for any reason except the
+ # blobs not existing on the server.
+ #
+ def _pull_source_storage(self, key, source_digest, remote):
+ def __pull_digest(digest):
+ self.cas._fetch_directory(remote, digest)
+ required_blobs = self.cas.required_blobs_for_directory(digest)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ if missing_blobs:
+ self.cas.fetch_blobs(remote, missing_blobs)
+
+ try:
+ # Fetch and parse source proto
+ self.cas.fetch_blobs(remote, [source_digest])
+ source = source_pb2.Source()
+ with open(self.cas.objpath(source_digest), "rb") as f:
+ source.ParseFromString(f.read())
+
+ # Write the source proto to cache
+ source_path = os.path.join(self._basedir, key)
+ with utils.save_file_atomic(source_path, mode="wb") as f:
+ f.write(source.SerializeToString())
+
+ __pull_digest(source.files)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError("Failed to pull source with status {}: {}".format(e.code().name, e.details()))
+ return False
+
+ return True
+
+ def _push_source(self, source_ref, remote):
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
+
+ try:
+ remote.init()
+ source_proto = self._get_source(source_ref)
+ remote.push_directory([uri], source_proto.files)
+ return True
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details()))
+ return False