summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-07-30 10:37:54 +0200
committerJürg Billeter <j@bitron.ch>2020-09-03 14:12:02 +0200
commit7175dbb76aab99935a4e3f5884bac9451bfb655e (patch)
tree6881fb1690664b621bf62cead6075c5e44afd942
parent8d006f8f17ad35f61f69101fe1531564d8d8a688 (diff)
downloadbuildstream-7175dbb76aab99935a4e3f5884bac9451bfb655e.tar.gz
Add ElementSourcesCache
Sources have been cached in CAS individually, except for sources that transform other sources, which have been cached combined with all previous sources of the element. This caching structure may be confusing as sources are specified in the element as a list and this is not a good fit for #1274 where we want to support caching individual sources in a Remote Asset server with a BuildStream-independent URI (especially the `directory` configuration would be problematic). This replaces the combined caching of 'previous' sources with an element-level source cache, which caches all sources of an element staged together. Sources that don't depend on previous sources are still cached individually. This also makes it possible to add a list of all element sources to the source proto used by the element-level source cache.
-rw-r--r--src/buildstream/_context.py12
-rw-r--r--src/buildstream/_elementsources.py259
-rw-r--r--src/buildstream/_elementsourcescache.py337
-rw-r--r--src/buildstream/_sourcecache.py12
-rw-r--r--src/buildstream/_stream.py3
-rw-r--r--src/buildstream/element.py33
-rw-r--r--src/buildstream/source.py10
-rw-r--r--tests/sourcecache/cache.py37
-rw-r--r--tests/sourcecache/fetch.py14
-rw-r--r--tests/sourcecache/push.py5
-rw-r--r--tests/sourcecache/staging.py8
11 files changed, 601 insertions, 129 deletions
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 8b559153e..0c2d1a150 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -28,6 +28,7 @@ from ._messenger import Messenger
from ._profile import Topics, PROFILER
from ._platform import Platform
from ._artifactcache import ArtifactCache
+from ._elementsourcescache import ElementSourcesCache
from ._sourcecache import SourceCache
from ._cas import CASCache, CASLogLevel
from .types import _CacheBuildTrees, _PipelineSelection, _SchedulerErrorAction
@@ -171,6 +172,7 @@ class Context:
# Private variables
self._platform = None
self._artifactcache = None
+ self._elementsourcescache = None
self._sourcecache = None
self._projects = []
self._project_overrides = Node.from_dict({})
@@ -193,6 +195,9 @@ class Context:
if self._artifactcache:
self._artifactcache.release_resources()
+ if self._elementsourcescache:
+ self._elementsourcescache.release_resources()
+
if self._sourcecache:
self._sourcecache.release_resources()
@@ -421,6 +426,13 @@ class Context:
return self._artifactcache
@property
+ def elementsourcescache(self):
+ if not self._elementsourcescache:
+ self._elementsourcescache = ElementSourcesCache(self)
+
+ return self._elementsourcescache
+
+ @property
def sourcecache(self):
if not self._sourcecache:
self._sourcecache = SourceCache(self)
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index c1ffc0919..5fc412f6b 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -17,30 +17,48 @@
from typing import TYPE_CHECKING, Iterator
+from . import _cachekey, utils
from ._context import Context
+from ._protos.buildstream.v2 import source_pb2
+from .plugin import Plugin
from .storage._casbaseddirectory import CasBasedDirectory
if TYPE_CHECKING:
from typing import List
+ # pylint: disable=cyclic-import
from .source import Source
+ from ._project import Project
+
+ # pylint: enable=cyclic-import
# An ElementSources object represents the combined sources of an element.
class ElementSources:
- def __init__(self, context: Context):
+ def __init__(self, context: Context, project: "Project", plugin: Plugin):
self._context = context
+ self._project = project
+ self._plugin = plugin
self._sources = [] # type: List[Source]
- self.vdir = None # Directory with staged sources
self._sourcecache = context.sourcecache # Source cache
+ self._elementsourcescache = context.elementsourcescache # Cache of staged element sources
self._is_resolved = False # Whether the source is fully resolved or not
self._cached = None # If the sources are known to be successfully cached in CAS
+ self._cache_key = None # Our cached cache key
+ self._proto = None # The cached Source proto
# the index of the last source in this element that requires previous
# sources for staging
self._last_source_requires_previous_idx = None
+ # get_project():
+ #
+ # Return the project associated with this object
+ #
+ def get_project(self):
+ return self._project
+
# add_source():
#
# Append source to this list of element sources.
@@ -89,28 +107,34 @@ class ElementSources:
return refs
- # stage():
+ # stage_and_cache():
#
- # Stage the element sources to a directory
+ # Stage the element sources to a directory in CAS
+ #
+ def stage_and_cache(self):
+ vdir = self._stage()
+
+ source_proto = source_pb2.Source()
+ source_proto.files.CopyFrom(vdir._get_digest())
+
+ self._elementsourcescache.store_proto(self, source_proto)
+
+ self._proto = source_proto
+ self._cached = True
+
+ # get_files():
+ #
+ # Get a virtual directory for the staged source files
#
# Returns:
- # (:class:`.storage.Directory`): A virtual directory object to stage sources into.
+ # (Directory): The virtual directory object
#
- def stage(self):
+ def get_files(self):
# Assert sources are cached
assert self.cached()
- self.vdir = CasBasedDirectory(self._context.get_cascache())
-
- if self._sources:
- # find last required source
- last_required_previous_idx = self._last_source_requires_previous()
-
- for source in self._sources[last_required_previous_idx:]:
- source_dir = self._sourcecache.export(source)
- self.vdir.import_files(source_dir)
-
- return self.vdir
+ cas = self._context.get_cascache()
+ return CasBasedDirectory(cas, digest=self._proto.files)
# fetch_done()
#
@@ -120,6 +144,8 @@ class ElementSources:
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
#
def fetch_done(self, fetched_original):
+ self._proto = self._elementsourcescache.load_proto(self)
+ assert self._proto
self._cached = True
for source in self._sources:
@@ -137,9 +163,15 @@ class ElementSources:
pushed = False
for source in self.sources():
- if self._sourcecache.push(source):
+ if source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH or source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
+ continue
+
+ if self._sourcecache.contains(source) and self._sourcecache.push(source):
pushed = True
+ if self._elementsourcescache.push(self, self._plugin):
+ pushed = True
+
return pushed
# init_workspace():
@@ -155,35 +187,50 @@ class ElementSources:
# fetch():
#
- # Fetch the element sources.
+ # Fetch the combined or individual element sources.
#
# Raises:
# SourceError: If one of the element sources has an error
#
- def fetch(self, fetch_original=False):
- previous_sources = []
- fetch_needed = False
-
- if self._sources and not fetch_original:
- for source in self._sources:
- if self._sourcecache.contains(source):
- continue
+ def fetch(self):
+ if self.cached():
+ return
- # try and fetch from source cache
- if not source._is_cached() and self._sourcecache.has_fetch_remotes():
- if self._sourcecache.pull(source):
- continue
+ # Try to fetch staged sources from remote source cache
+ if self._elementsourcescache.has_fetch_remotes() and self._elementsourcescache.pull(self, self._plugin):
+ self.fetch_done(False)
+ return
- fetch_needed = True
+ # Otherwise, fetch individual sources
+ self.fetch_sources()
- # We need to fetch original sources
- if fetch_needed or fetch_original:
- for source in self.sources():
+ # fetch_sources():
+ #
+ # Fetch the individual element sources.
+ #
+ # Args:
+ # fetch_original (bool): Always fetch original source
+ #
+ # Raises:
+ # SourceError: If one of the element sources has an error
+ #
+ def fetch_sources(self, *, fetch_original=False):
+ previous_sources = []
+ for source in self._sources:
+ if (
+ fetch_original
+ or source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH
+ or source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE
+ ):
+ # Source depends on previous sources, it cannot be stored in
+ # CAS-based source cache on its own. Fetch original source
+ # if it's not in the plugin-specific cache yet.
if not source._is_cached():
source._fetch(previous_sources)
- previous_sources.append(source)
+ else:
+ self._fetch_source(source)
- self._cache_sources()
+ previous_sources.append(source)
# get_unique_key():
#
@@ -194,6 +241,8 @@ class ElementSources:
# (str, list, dict): A string, list or dictionary as unique identifier
#
def get_unique_key(self):
+ assert self.is_resolved()
+
result = []
for source in self._sources:
@@ -201,6 +250,27 @@ class ElementSources:
return result
+ # get_cache_key():
+ #
+ # Return cache key for the combined element sources
+ #
+ def get_cache_key(self):
+ return self._cache_key
+
+ # _get_brief_display_key()
+ #
+ # Returns an abbreviated cache key for display purposes
+ #
+ # Returns:
+ # (str): An abbreviated hex digest cache key for this Element
+ #
+ def get_brief_display_key(self):
+ context = self._context
+ key = self._cache_key
+
+ length = min(len(key), context.log_key_length)
+ return key[:length]
+
# cached():
#
# Check if the element sources are cached in CAS, generating the source
@@ -213,21 +283,19 @@ class ElementSources:
if self._cached is not None:
return self._cached
- sourcecache = self._sourcecache
+ cas = self._context.get_cascache()
+ elementsourcescache = self._elementsourcescache
- # Go through sources we'll cache generating keys
- for ix, source in enumerate(self._sources):
- if not source._key:
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- source._generate_key(self._sources[:ix])
- else:
- source._generate_key([])
+ source_proto = elementsourcescache.load_proto(self)
+ if not source_proto:
+ self._cached = False
+ return False
- # Check all sources are in source cache
- for source in self._sources:
- if not sourcecache.contains(source):
- return False
+ if not cas.contains_directory(source_proto.files, with_files=True):
+ self._cached = False
+ return False
+ self._proto = source_proto
self._cached = True
return True
@@ -261,11 +329,22 @@ class ElementSources:
# from the workspace, is a component of the element's cache keys.
#
def update_resolved_state(self):
+ if self._is_resolved:
+ # Already resolved
+ return
+
for source in self._sources:
if not source.is_resolved():
- break
- else:
- self._is_resolved = True
+ return
+
+ # Source is resolved, generate its cache key
+ source._generate_key()
+
+ self._is_resolved = True
+
+ # Also generate the cache key for the combined element sources
+ unique_key = self.get_unique_key()
+ self._cache_key = _cachekey.generate_key(unique_key)
# preflight():
#
@@ -286,34 +365,58 @@ class ElementSources:
for source in self.sources():
source._preflight()
- # _cache_sources():
+ # _fetch_source():
#
- # Caches the sources into the local CAS
+ # Fetch a single source into the local CAS-based source cache
#
- def _cache_sources(self):
- if self._sources and not self.cached():
- last_requires_previous = 0
- # commit all other sources by themselves
- for idx, source in enumerate(self._sources):
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- self._sourcecache.commit(source, self._sources[last_requires_previous:idx])
- last_requires_previous = idx
- else:
- self._sourcecache.commit(source, [])
+ # Args:
+ # source (Source): The source to fetch
+ #
+ def _fetch_source(self, source):
+ # Cannot store a source in the CAS-based source cache on its own
+ # if the source depends on previous sources.
+ assert not source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH and not source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE
+
+ if self._sourcecache.contains(source):
+ # Already cached
+ return
+
+ cached_original = source._is_cached()
+ if not cached_original:
+ if self._sourcecache.has_fetch_remotes() and self._sourcecache.pull(source):
+ # Successfully fetched individual source from remote source cache
+ return
- # _last_source_requires_previous
+ # Unable to fetch source from remote source cache, fall back to
+ # fetching the original source.
+ source._fetch([])
+
+ # Stage original source into the local CAS-based source cache
+ self._sourcecache.commit(source)
+
+ # _stage():
#
- # This is the last source that requires previous sources to be cached.
- # Sources listed after this will be cached separately.
+ # Stage the element sources
#
- # Returns:
- # (int): index of last source that requires previous sources
- #
- def _last_source_requires_previous(self):
- if self._last_source_requires_previous_idx is None:
- last_requires_previous = 0
- for idx, source in enumerate(self._sources):
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- last_requires_previous = idx
- self._last_source_requires_previous_idx = last_requires_previous
- return self._last_source_requires_previous_idx
+ def _stage(self):
+ vdir = CasBasedDirectory(self._context.get_cascache())
+
+ for source in self._sources:
+ if source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH or source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
+ if source.BST_STAGE_VIRTUAL_DIRECTORY:
+ source._stage(vdir)
+ else:
+ with utils._tempdir(dir=self._context.tmpdir, prefix="staging-temp") as tmpdir:
+ # Stage previous sources
+ vdir.export_files(tmpdir)
+
+ source._stage(tmpdir)
+
+ # Capture modified tree
+ vdir._clear()
+ vdir.import_files(tmpdir)
+ else:
+ source_dir = self._sourcecache.export(source)
+ vdir.import_files(source_dir)
+
+ return vdir
diff --git a/src/buildstream/_elementsourcescache.py b/src/buildstream/_elementsourcescache.py
new file mode 100644
index 000000000..84e7633e5
--- /dev/null
+++ b/src/buildstream/_elementsourcescache.py
@@ -0,0 +1,337 @@
+#
+# Copyright (C) 2019-2020 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+import os
+import grpc
+
+from ._cas.casremote import BlobNotFound
+from ._assetcache import AssetCache
+from ._exceptions import AssetCacheError, CASError, CASRemoteError, SourceCacheError
+from . import utils
+from ._protos.buildstream.v2 import source_pb2
+
+REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}"
+
+
+# Class that keeps config of remotes and deals with caching of sources.
+#
+# Args:
+# context (Context): The Buildstream context
+#
+class ElementSourcesCache(AssetCache):
+
+ spec_name = "source_cache_specs"
+ config_node_name = "source-caches"
+
+ def __init__(self, context):
+ super().__init__(context)
+
+ self._basedir = os.path.join(context.cachedir, "elementsources")
+ os.makedirs(self._basedir, exist_ok=True)
+
+ # load_proto():
+ #
+ # Load source proto from local cache.
+ #
+ # Args:
+ # sources (ElementSources): The sources whose proto we want to load
+ #
+ def load_proto(self, sources):
+ ref = sources.get_cache_key()
+ path = self._source_path(ref)
+
+ if not os.path.exists(path):
+ return None
+
+ source_proto = source_pb2.Source()
+ with open(path, "r+b") as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+
+ def store_proto(self, sources, proto):
+ ref = sources.get_cache_key()
+ path = self._source_path(ref)
+
+ with utils.save_file_atomic(path, "w+b") as f:
+ f.write(proto.SerializeToString())
+
+ # pull():
+ #
+ # Attempts to pull sources from configured remote source caches.
+ #
+ # Args:
+ # sources (ElementSources): The sources we want to fetch
+ #
+ # Returns:
+ # (bool): True if pull successful, False if not
+ #
+ def pull(self, sources, plugin):
+ project = sources.get_project()
+
+ ref = sources.get_cache_key()
+ display_key = sources.get_brief_display_key()
+
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref)
+
+ source_digest = None
+ errors = []
+ # Start by pulling our source proto, so that we know which
+ # blobs to pull
+ for remote in self._index_remotes[project]:
+ remote.init()
+ try:
+ plugin.status("Pulling source {} <- {}".format(display_key, remote))
+ response = remote.fetch_blob([uri])
+ if response:
+ source_digest = response.blob_digest
+ break
+
+ plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key))
+ except AssetCacheError as e:
+ plugin.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors and not source_digest:
+ raise SourceCacheError(
+ "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors)
+ )
+
+ # If we don't have a source proto, we can't pull source files
+ if not source_digest:
+ return False
+
+ errors = []
+ for remote in self._storage_remotes[project]:
+ remote.init()
+ try:
+ plugin.status("Pulling data for source {} <- {}".format(display_key, remote))
+
+ if self._pull_source_storage(ref, source_digest, remote):
+ plugin.info("Pulled source {} <- {}".format(display_key, remote))
+ return True
+
+ plugin.info("Remote ({}) does not have source {} cached".format(remote, display_key))
+ except BlobNotFound as e:
+ # Not all blobs are available on this remote
+ plugin.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob))
+ continue
+ except CASError as e:
+ plugin.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors:
+ raise SourceCacheError(
+ "Failed to pull source {}".format(display_key), detail="\n".join(str(e) for e in errors)
+ )
+
+ return False
+
+ # push():
+ #
+ # Push sources to remote repository.
+ #
+ # Args:
+ # sources (ElementSources): The sources to be pushed
+ #
+ # Returns:
+ # (bool): True if any remote was updated, False if no pushes were required
+ #
+ # Raises:
+ # (SourceCacheError): if there was an error
+ #
+ def push(self, sources, plugin):
+ project = sources.get_project()
+
+ ref = sources.get_cache_key()
+ display_key = sources.get_brief_display_key()
+
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(ref)
+
+ index_remotes = [r for r in self._index_remotes[project] if r.push]
+ storage_remotes = [r for r in self._storage_remotes[project] if r.push]
+
+ source_proto = self.load_proto(sources)
+ source_digest = self.cas.add_object(buffer=source_proto.SerializeToString())
+
+ pushed = False
+
+ # First push our files to all storage remotes, so that they
+ # can perform file checks on their end
+ for remote in storage_remotes:
+ remote.init()
+ plugin.status("Pushing data from source {} -> {}".format(display_key, remote))
+
+ if self._push_source_blobs(source_proto, source_digest, remote):
+ plugin.info("Pushed data from source {} -> {}".format(display_key, remote))
+ else:
+ plugin.info("Remote ({}) already has all data of source {} cached".format(remote, display_key()))
+
+ for remote in index_remotes:
+ remote.init()
+ plugin.status("Pushing source {} -> {}".format(display_key, remote))
+
+ if self._push_source_proto(uri, source_proto, source_digest, remote):
+ plugin.info("Pushed source {} -> {}".format(display_key, remote))
+ pushed = True
+ else:
+ plugin.info("Remote ({}) already has source {} cached".format(remote, display_key))
+
+ return pushed
+
+ def _get_source(self, ref):
+ path = self._source_path(ref)
+ source_proto = source_pb2.Source()
+ try:
+ with open(path, "r+b") as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+ except FileNotFoundError as e:
+ raise SourceCacheError("Attempted to access unavailable source: {}".format(e)) from e
+
+ def _source_path(self, ref):
+ return os.path.join(self._basedir, ref)
+
+ # _push_source_blobs()
+ #
+ # Push the blobs that make up an source to the remote server.
+ #
+ # Args:
+ # source_proto: The source proto whose blobs to push.
+ # source_digest: The digest of the source proto.
+ # remote (CASRemote): The remote to push the blobs to.
+ #
+ # Returns:
+ # (bool) - True if we uploaded anything, False otherwise.
+ #
+ # Raises:
+ # SourceCacheError: If we fail to push blobs (*unless* they're
+ # already there or we run out of space on the server).
+ #
+ def _push_source_blobs(self, source_proto, source_digest, remote):
+ try:
+ # Push source files
+ self.cas._send_directory(remote, source_proto.files)
+ # Push source proto
+ self.cas.send_blobs(remote, [source_digest])
+
+ except CASRemoteError as cas_error:
+ if cas_error.reason != "cache-too-full":
+ raise SourceCacheError("Failed to push source blobs: {}".format(cas_error))
+ return False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError(
+ "Failed to push source blobs with status {}: {}".format(e.code().name, e.details())
+ )
+ return False
+
+ return True
+
+ # _push_source_proto()
+ #
+ # Pushes the source proto to remote.
+ #
+ # Args:
+ # source_proto: The source proto.
+ # source_digest: The digest of the source proto.
+ # remote (AssetRemote): Remote to push to
+ #
+ # Returns:
+ # (bool): Whether we pushed the source.
+ #
+ # Raises:
+ # SourceCacheError: If the push fails for any reason except the
+ # source already existing.
+ #
+ def _push_source_proto(self, uri, source_proto, source_digest, remote):
+ try:
+ response = remote.fetch_blob([uri])
+ # Skip push if source is already on the server
+ if response and response.blob_digest == source_digest:
+ return False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError(
+ "Error checking source cache with status {}: {}".format(e.code().name, e.details())
+ )
+
+ referenced_directories = [source_proto.files]
+
+ try:
+ remote.push_blob(
+ [uri], source_digest, references_directories=referenced_directories,
+ )
+ except grpc.RpcError as e:
+ raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details()))
+
+ return True
+
+ # _pull_source_storage():
+ #
+ # Pull source blobs from the given remote.
+ #
+ # Args:
+ # key (str): The specific key for the source to pull
+ # remote (CASRemote): remote to pull from
+ #
+ # Returns:
+ # (bool): True if we pulled any blobs.
+ #
+ # Raises:
+ # SourceCacheError: If the pull failed for any reason except the
+ # blobs not existing on the server.
+ #
+ def _pull_source_storage(self, key, source_digest, remote):
+ def __pull_digest(digest):
+ self.cas._fetch_directory(remote, digest)
+ required_blobs = self.cas.required_blobs_for_directory(digest)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ if missing_blobs:
+ self.cas.fetch_blobs(remote, missing_blobs)
+
+ try:
+ # Fetch and parse source proto
+ self.cas.fetch_blobs(remote, [source_digest])
+ source = source_pb2.Source()
+ with open(self.cas.objpath(source_digest), "rb") as f:
+ source.ParseFromString(f.read())
+
+ # Write the source proto to cache
+ source_path = os.path.join(self._basedir, key)
+ with utils.save_file_atomic(source_path, mode="wb") as f:
+ f.write(source.SerializeToString())
+
+ __pull_digest(source.files)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError("Failed to pull source with status {}: {}".format(e.code().name, e.details()))
+ return False
+
+ return True
+
+ def _push_source(self, source_ref, remote):
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
+
+ try:
+ remote.init()
+ source_proto = self._get_source(source_ref)
+ remote.push_directory([uri], source_proto.files)
+ return True
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details()))
+ return False
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index fd75be34d..76c22efbd 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -70,25 +70,17 @@ class SourceCache(AssetCache):
# commit()
#
- # Given a source along with previous sources, it stages and commits these
- # to the local CAS. This is done due to some types of sources being
- # dependent on previous sources, such as the patch source.
+ # Given a source, it stages and commits it to the local CAS.
#
# Args:
# source: last source
- # previous_sources: rest of the sources.
- def commit(self, source, previous_sources):
+ def commit(self, source):
ref = source._get_source_name()
- # Use tmpdir for now
vdir = CasBasedDirectory(self.cas)
- for previous_source in previous_sources:
- vdir.import_files(self.export(previous_source))
if not source.BST_STAGE_VIRTUAL_DIRECTORY:
with utils._tempdir(dir=self.context.tmpdir, prefix="staging-temp") as tmpdir:
- if not vdir.is_empty():
- vdir.export_files(tmpdir)
source._stage(tmpdir)
vdir.import_files(tmpdir, can_link=True)
else:
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index bd203b0a0..fe641d03c 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -83,6 +83,7 @@ class Stream:
#
self._context = context
self._artifacts = None
+ self._elementsourcescache = None
self._sourcecache = None
self._project = None
self._pipeline = None
@@ -104,6 +105,7 @@ class Stream:
#
def init(self):
self._artifacts = self._context.artifactcache
+ self._elementsourcescache = self._context.elementsourcescache
self._sourcecache = self._context.sourcecache
# cleanup()
@@ -1176,6 +1178,7 @@ class Stream:
# Connect to remote caches, this needs to be done before resolving element state
self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_url)
+ self._elementsourcescache.setup_remotes(use_config=use_source_config, remote_url=source_url)
self._sourcecache.setup_remotes(use_config=use_source_config, remote_url=source_url)
# _load_tracking()
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 4b53aa3dd..cfc21f42b 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -240,7 +240,7 @@ class Element(Plugin):
self.__ready_for_runtime = False # Whether the element and its runtime dependencies have cache keys
self.__ready_for_runtime_and_cached = False # Whether all runtime deps are cached, as well as the element
self.__cached_remotely = None # Whether the element is cached remotely
- self.__sources = ElementSources(context) # The element sources
+ self.__sources = ElementSources(context, project, self) # The element sources
self.__weak_cache_key = None # Our cached weak cache key
self.__strict_cache_key = None # Our cached cache key for strict builds
self.__artifacts = context.artifactcache # Artifact cache
@@ -1317,12 +1317,7 @@ class Element(Plugin):
# No cached buildtree, stage source from source cache
else:
- try:
- staged_sources = self.__sources.stage()
- except (SourceCacheError, VirtualDirectoryError) as e:
- raise ElementError(
- "Error trying to stage sources for {}: {}".format(self.name, e), reason="stage-sources-fail"
- )
+ staged_sources = self.__sources.get_files()
# incremental builds should merge the source into the last artifact before staging
last_build_artifact = self.__get_last_build_artifact()
@@ -1628,7 +1623,7 @@ class Element(Plugin):
# if the directory could not be found.
pass
- sourcesvdir = self.__sources.vdir
+ sourcesvdir = self.__sources.get_files()
if collect is not None:
try:
@@ -1747,7 +1742,7 @@ class Element(Plugin):
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
return True
- return not (self.__sourcecache.has_push_remotes(plugin=self) and self._has_all_sources_in_source_cache())
+ return not (self.__sourcecache.has_push_remotes(plugin=self) and self._cached_sources())
def _source_push(self):
return self.__sources.push()
@@ -1989,7 +1984,19 @@ class Element(Plugin):
# SourceError: If one of the element sources has an error
#
def _fetch(self, fetch_original=False):
- self.__sources.fetch(fetch_original=fetch_original)
+ if fetch_original:
+ self.__sources.fetch_sources(fetch_original=True)
+
+ self.__sources.fetch()
+
+ if not self.__sources.cached():
+ try:
+ # Stage all element sources into CAS
+ self.__sources.stage_and_cache()
+ except (SourceCacheError, VirtualDirectoryError) as e:
+ raise ElementError(
+ "Error trying to stage sources for {}: {}".format(self.name, e), reason="stage-sources-fail"
+ )
# _calculate_cache_key():
#
@@ -2032,14 +2039,14 @@ class Element(Plugin):
return _cachekey.generate_key(cache_key_dict)
- # _has_all_sources_in_source_cache()
+ # _cached_sources()
#
- # Get whether all sources of the element are cached in CAS
+ # Get whether the staged element sources are cached in CAS
#
# Returns:
# (bool): True if the element sources are in CAS
#
- def _has_all_sources_in_source_cache(self):
+ def _cached_sources(self):
return self.__sources.cached()
# _has_all_sources_resolved()
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index ea77a6537..d7e6021bc 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -1150,14 +1150,8 @@ class Source(Plugin):
else:
return None
- def _generate_key(self, previous_sources):
- keys = [self._get_unique_key()]
-
- if self.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- for previous_source in previous_sources:
- keys.append(previous_source._get_unique_key())
-
- self.__key = generate_key(keys)
+ def _generate_key(self):
+ self.__key = generate_key(self._get_unique_key())
@property
def _key(self):
diff --git a/tests/sourcecache/cache.py b/tests/sourcecache/cache.py
index bbc3d8329..34009fc65 100644
--- a/tests/sourcecache/cache.py
+++ b/tests/sourcecache/cache.py
@@ -37,12 +37,15 @@ def test_patch_sources_cached_1(cli, datafiles):
res = cli.run(project=project_dir, args=["build", "source-with-patches-1.bst"])
res.assert_success()
- # as we have a local, patch, local config, the first local and patch should
- # be cached together, and the last local on it's own
source_protos = os.path.join(project_dir, "cache", "source_protos")
+ elementsources_protos = os.path.join(project_dir, "cache", "elementsources")
- assert len(os.listdir(os.path.join(source_protos, "patch"))) == 1
+ # The two local sources can be cached individually,
+ # the patch source cannot be cached on its own
assert len(os.listdir(os.path.join(source_protos, "local"))) == 2
+ assert not os.path.exists(os.path.join(source_protos, "patch"))
+
+ assert len(os.listdir(elementsources_protos)) == 1
@pytest.mark.datafiles(DATA_DIR)
@@ -52,10 +55,15 @@ def test_patch_sources_cached_2(cli, datafiles):
res = cli.run(project=project_dir, args=["build", "source-with-patches-2.bst"])
res.assert_success()
- # As everything is before the patch it should all be cached together
source_protos = os.path.join(project_dir, "cache", "source_protos")
+ elementsources_protos = os.path.join(project_dir, "cache", "elementsources")
+
+ # The three local sources can be cached individually,
+ # the patch source cannot be cached on its own
+ assert len(os.listdir(os.path.join(source_protos, "local"))) == 3
+ assert not os.path.exists(os.path.join(source_protos, "patch"))
- assert len(os.listdir(os.path.join(source_protos, "patch"))) == 1
+ assert len(os.listdir(elementsources_protos)) == 1
@pytest.mark.datafiles(DATA_DIR)
@@ -67,9 +75,12 @@ def test_sources_without_patch(cli, datafiles):
# No patches so everything should be cached seperately
source_protos = os.path.join(project_dir, "cache", "source_protos")
+ elementsources_protos = os.path.join(project_dir, "cache", "elementsources")
assert len(os.listdir(os.path.join(source_protos, "local"))) == 3
+ assert len(os.listdir(elementsources_protos)) == 1
+
@pytest.mark.datafiles(DATA_DIR)
def test_source_cache_key(cli, datafiles):
@@ -103,9 +114,17 @@ def test_source_cache_key(cli, datafiles):
res = cli.run(project=project_dir, args=["build", element_name])
res.assert_success()
- # Should have one source ref
+ # Should have source refs for the two remote sources
+ remote_protos = os.path.join(project_dir, "cache", "source_protos", "remote")
+ assert len(os.listdir(remote_protos)) == 2
+ # Should not have any source refs for the patch source
+ # as that is a transformation of the previous sources,
+ # not cacheable on its own
patch_protos = os.path.join(project_dir, "cache", "source_protos", "patch")
- assert len(os.listdir(patch_protos)) == 1
+ assert not os.path.exists(patch_protos)
+ # Should have one element sources ref
+ elementsources_protos = os.path.join(project_dir, "cache", "elementsources")
+ assert len(os.listdir(elementsources_protos)) == 1
# modify hello-patch file and check tracking updates refs
with open(os.path.join(file_path, "dev-files", "usr", "include", "pony.h"), "a") as f:
@@ -118,5 +137,5 @@ def test_source_cache_key(cli, datafiles):
res = cli.run(project=project_dir, args=["source", "fetch", element_name])
res.assert_success()
- # We should have a new source ref
- assert len(os.listdir(patch_protos)) == 2
+ # We should have a new element sources ref
+ assert len(os.listdir(elementsources_protos)) == 2
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index ac8c6258f..76f5508f9 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -75,7 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
- assert not element._has_all_sources_in_source_cache()
+ assert not element._cached_sources()
source = list(element.sources())[0]
assert not share.get_source_proto(source._get_source_name())
@@ -117,7 +117,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element._initialize_state()
# check that we have the source in the cas now and it's not fetched
- assert element._has_all_sources_in_source_cache()
+ assert element._cached_sources()
assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
@@ -135,7 +135,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
- assert not element._has_all_sources_in_source_cache()
+ assert not element._cached_sources()
source = list(element.sources())[0]
assert not share.get_source_proto(source._get_source_name())
@@ -151,7 +151,9 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
assert ("SUCCESS Fetching from {}".format(repo.source_config(ref=ref)["url"])) in res.stderr
# Check that the source in both in the source dir and the local CAS
- assert element._has_all_sources_in_source_cache()
+ element = project.load_elements([element_name])[0]
+ element._initialize_state()
+ assert element._cached_sources()
@pytest.mark.datafiles(DATA_DIR)
@@ -167,7 +169,7 @@ def test_pull_fail(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
- assert not element._has_all_sources_in_source_cache()
+ assert not element._cached_sources()
source = list(element.sources())[0]
# remove files and check that it doesn't build
@@ -199,7 +201,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
- assert not element._has_all_sources_in_source_cache()
+ assert not element._cached_sources()
source = list(element.sources())[0]
assert not share.get_artifact_proto(source._get_source_name())
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index b1aa8a375..25a4309b8 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -85,7 +85,7 @@ def test_source_push_split(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
- assert not element._has_all_sources_in_source_cache()
+ assert not element._cached_sources()
source = list(element.sources())[0]
# check we don't have it in the current cache
@@ -135,7 +135,7 @@ def test_source_push(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
- assert not element._has_all_sources_in_source_cache()
+ assert not element._cached_sources()
source = list(element.sources())[0]
# check we don't have it in the current cache
@@ -268,6 +268,7 @@ def test_push_missing_source_after_build(cli, tmpdir, datafiles):
res.assert_success()
# Delete source but keep artifact in cache
+ shutil.rmtree(os.path.join(cache_dir, "elementsources"))
shutil.rmtree(os.path.join(cache_dir, "source_protos"))
with create_artifact_share(os.path.join(str(tmpdir), "sourceshare")) as share:
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index bfde1b436..0f2f05891 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -66,7 +66,7 @@ def test_source_staged(tmpdir, cli, datafiles):
element = project.load_elements(["import-bin.bst"])[0]
element._initialize_state()
source = list(element.sources())[0]
- assert element._has_all_sources_in_source_cache()
+ assert element._cached_sources()
assert sourcecache.contains(source)
# Extract the file and check it's the same as the one we imported
@@ -101,7 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
element = project.load_elements(["import-dev.bst"])[0]
element._initialize_state()
source = list(element.sources())[0]
- assert element._has_all_sources_in_source_cache()
+ assert element._cached_sources()
# check that the directory structures are identical
digest = sourcecache.export(source)._get_digest()
@@ -120,6 +120,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
cachedir = os.path.join(str(tmpdir), "cache")
element_path = "elements"
source_protos = os.path.join(str(tmpdir), "cache", "source_protos")
+ elementsources = os.path.join(str(tmpdir), "cache", "elementsources")
source_dir = os.path.join(str(tmpdir), "cache", "sources")
cli.configure({"cachedir": cachedir})
@@ -135,7 +136,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
element._initialize_state()
# check consistency of the source
- assert not element._has_all_sources_in_source_cache()
+ assert not element._cached_sources()
res = cli.run(project=project_dir, args=["build", "target.bst"])
res.assert_success()
@@ -164,6 +165,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
# Now remove the source refs and check the state
shutil.rmtree(source_protos)
+ shutil.rmtree(elementsources)
cli.remove_artifact_from_cache(project_dir, "target.bst")
states = cli.get_element_states(project_dir, ["target.bst"])
assert states["target.bst"] == "fetch needed"