diff options
38 files changed, 1195 insertions, 362 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index 7da2d959c..49f07cb50 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -17,17 +17,11 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> -import multiprocessing -import os -from collections.abc import Mapping - +from ._basecache import BaseCache from .types import _KeyStrength from ._exceptions import ArtifactError, CASError -from ._message import Message, MessageType -from . import utils -from . import _yaml -from ._cas import CASRemote, CASRemoteSpec, CASCacheUsage +from ._cas import CASRemoteSpec from .storage._casbaseddirectory import CasBasedDirectory @@ -51,89 +45,20 @@ class ArtifactCacheSpec(CASRemoteSpec): # Args: # context (Context): The BuildStream context # -class ArtifactCache(): - def __init__(self, context): - self.context = context +class ArtifactCache(BaseCache): - self.cas = context.get_cascache() - self.casquota = context.get_casquota() - self.casquota._calculate_cache_quota() + spec_class = ArtifactCacheSpec + spec_name = "artifact_cache_specs" + spec_error = ArtifactError + config_node_name = "artifacts" - self.global_remote_specs = [] - self.project_remote_specs = {} + def __init__(self, context): + super().__init__(context) self._required_elements = set() # The elements required for this session - self._remotes_setup = False # Check to prevent double-setup of remotes - - # Per-project list of _CASRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - # setup_remotes(): - # - # Sets up which remotes to use - # - # Args: - # use_config (bool): Whether to use project configuration - # remote_url (str): Remote artifact cache URL - # - # This requires that all of the projects which are to be processed in the session - # have already been loaded and are observable in the Context. - # - def setup_remotes(self, *, use_config=False, remote_url=None): - - # Ensure we do not double-initialise since this can be expensive - assert not self._remotes_setup - self._remotes_setup = True - - # Initialize remote artifact caches. We allow the commandline to override - # the user config in some cases (for example `bst artifact push --remote=...`). - has_remote_caches = False - if remote_url: - self._set_remotes([ArtifactCacheSpec(remote_url, push=True)]) - has_remote_caches = True - if use_config: - for project in self.context.get_projects(): - artifact_caches = _configured_remote_artifact_cache_specs(self.context, project) - if artifact_caches: # artifact_caches is a list of ArtifactCacheSpec instances - self._set_remotes(artifact_caches, project=project) - has_remote_caches = True - if has_remote_caches: - self._initialize_remotes() - - # specs_from_config_node() - # - # Parses the configuration of remote artifact caches from a config block. - # - # Args: - # config_node (dict): The config block, which may contain the 'artifacts' key - # basedir (str): The base directory for relative paths - # - # Returns: - # A list of ArtifactCacheSpec instances. - # - # Raises: - # LoadError, if the config block contains invalid keys. - # - @staticmethod - def specs_from_config_node(config_node, basedir=None): - cache_specs = [] - - artifacts = config_node.get('artifacts', []) - if isinstance(artifacts, Mapping): - cache_specs.append(ArtifactCacheSpec._new_from_config_node(artifacts, basedir)) - elif isinstance(artifacts, list): - for spec_node in artifacts: - cache_specs.append(ArtifactCacheSpec._new_from_config_node(spec_node, basedir)) - else: - provenance = _yaml.node_get_provenance(config_node, key='artifacts') - raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA, - "%s: 'artifacts' must be a single 'url:' mapping, or a list of mappings" % - (str(provenance))) - return cache_specs + self.casquota.add_ref_callbacks(self.required_artifacts()) + self.casquota.add_remove_callbacks((lambda x: not x.startswith('@'), self.remove)) # mark_required_elements(): # @@ -176,117 +101,15 @@ class ArtifactCache(): except CASError: pass - # clean(): - # - # Clean the artifact cache as much as possible. - # - # Args: - # progress (callable): A callback to call when a ref is removed - # - # Returns: - # (int): The size of the cache after having cleaned up - # - def clean(self, progress=None): - artifacts = self.list_artifacts() - context = self.context - - # Some accumulative statistics - removed_ref_count = 0 - space_saved = 0 - - # Start off with an announcement with as much info as possible - volume_size, volume_avail = self.casquota._get_cache_volume_size() - self._message(MessageType.STATUS, "Starting cache cleanup", - detail=("Elements required by the current build plan: {}\n" + - "User specified quota: {} ({})\n" + - "Cache usage: {}\n" + - "Cache volume: {} total, {} available") - .format(len(self._required_elements), - context.config_cache_quota, - utils._pretty_size(self.casquota._cache_quota, dec_places=2), - utils._pretty_size(self.casquota.get_cache_size(), dec_places=2), - utils._pretty_size(volume_size, dec_places=2), - utils._pretty_size(volume_avail, dec_places=2))) - + def required_artifacts(self): # Build a set of the cache keys which are required # based on the required elements at cleanup time # # We lock both strong and weak keys - deleting one but not the # other won't save space, but would be a user inconvenience. - required_artifacts = set() for element in self._required_elements: - required_artifacts.update([ - element._get_cache_key(strength=_KeyStrength.STRONG), - element._get_cache_key(strength=_KeyStrength.WEAK) - ]) - - # Do a real computation of the cache size once, just in case - self.casquota.compute_cache_size() - usage = CASCacheUsage(self.casquota) - self._message(MessageType.STATUS, "Cache usage recomputed: {}".format(usage)) - - while self.casquota.get_cache_size() >= self.casquota._cache_lower_threshold: - try: - to_remove = artifacts.pop(0) - except IndexError: - # If too many artifacts are required, and we therefore - # can't remove them, we have to abort the build. - # - # FIXME: Asking the user what to do may be neater - # - default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], - 'buildstream.conf') - detail = ("Aborted after removing {} refs and saving {} disk space.\n" - "The remaining {} in the cache is required by the {} elements in your build plan\n\n" - "There is not enough space to complete the build.\n" - "Please increase the cache-quota in {} and/or make more disk space." - .format(removed_ref_count, - utils._pretty_size(space_saved, dec_places=2), - utils._pretty_size(self.casquota.get_cache_size(), dec_places=2), - len(self._required_elements), - (context.config_origin or default_conf))) - - if self.full(): - raise ArtifactError("Cache too full. Aborting.", - detail=detail, - reason="cache-too-full") - else: - break - - key = to_remove.rpartition('/')[2] - if key not in required_artifacts: - - # Remove the actual artifact, if it's not required. - size = self.remove(to_remove) - - removed_ref_count += 1 - space_saved += size - - self._message(MessageType.STATUS, - "Freed {: <7} {}".format( - utils._pretty_size(size, dec_places=2), - to_remove)) - - # Remove the size from the removed size - self.casquota.set_cache_size(self.casquota._cache_size - size) - - # User callback - # - # Currently this process is fairly slow, but we should - # think about throttling this progress() callback if this - # becomes too intense. - if progress: - progress() - - # Informational message about the side effects of the cleanup - self._message(MessageType.INFO, "Cleanup completed", - detail=("Removed {} refs and saving {} disk space.\n" + - "Cache usage is now: {}") - .format(removed_ref_count, - utils._pretty_size(space_saved, dec_places=2), - utils._pretty_size(self.casquota.get_cache_size(), dec_places=2))) - - return self.casquota.get_cache_size() + yield element._get_cache_key(strength=_KeyStrength.STRONG) + yield element._get_cache_key(strength=_KeyStrength.WEAK) def full(self): return self.casquota.full() @@ -312,56 +135,6 @@ class ArtifactCache(): def preflight(self): self.cas.preflight() - # initialize_remotes(): - # - # This will contact each remote cache. - # - # Args: - # on_failure (callable): Called if we fail to contact one of the caches. - # - def initialize_remotes(self, *, on_failure=None): - remote_specs = list(self.global_remote_specs) - - for project in self.project_remote_specs: - remote_specs += self.project_remote_specs[project] - - remote_specs = list(utils._deduplicate(remote_specs)) - - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - - error = CASRemote.check_remote(remote_spec, q) - - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) - else: - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True - - remotes[remote_spec.url] = CASRemote(remote_spec) - - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) - - project_remotes = [] - - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue - - remote = remotes[remote_spec.url] - project_remotes.append(remote) - - self._remotes[project] = project_remotes - # contains(): # # Check whether the artifact for the specified Element is already available @@ -405,7 +178,9 @@ class ArtifactCache(): # ([str]) - A list of artifact names as generated in LRU order # def list_artifacts(self, *, glob=None): - return self.cas.list_refs(glob=glob) + return list(filter( + lambda x: not x.startswith('@'), + self.cas.list_refs(glob=glob))) # remove(): # @@ -704,61 +479,3 @@ class ArtifactCache(): cache_id = self.cas.resolve_ref(ref, update_mtime=True) vdir = CasBasedDirectory(self.cas, digest=cache_id).descend('logs') return vdir - - ################################################ - # Local Private Methods # - ################################################ - - # _message() - # - # Local message propagator - # - def _message(self, message_type, message, **kwargs): - args = dict(kwargs) - self.context.message( - Message(None, message_type, message, **args)) - - # _set_remotes(): - # - # Set the list of remote caches. If project is None, the global list of - # remote caches will be set, which is used by all projects. If a project is - # specified, the per-project list of remote caches will be set. - # - # Args: - # remote_specs (list): List of ArtifactCacheSpec instances, in priority order. - # project (Project): The Project instance for project-specific remotes - def _set_remotes(self, remote_specs, *, project=None): - if project is None: - # global remotes - self.global_remote_specs = remote_specs - else: - self.project_remote_specs[project] = remote_specs - - # _initialize_remotes() - # - # An internal wrapper which calls the abstract method and - # reports takes care of messaging - # - def _initialize_remotes(self): - def remote_failed(url, error): - self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error)) - - with self.context.timed_activity("Initializing remote caches", silent_nested=True): - self.initialize_remotes(on_failure=remote_failed) - - -# _configured_remote_artifact_cache_specs(): -# -# Return the list of configured artifact remotes for a given project, in priority -# order. This takes into account the user and project configuration. -# -# Args: -# context (Context): The BuildStream context -# project (Project): The BuildStream project -# -# Returns: -# A list of ArtifactCacheSpec instances describing the remote artifact caches. -# -def _configured_remote_artifact_cache_specs(context, project): - return list(utils._deduplicate( - project.artifact_cache_specs + context.artifact_cache_specs)) diff --git a/buildstream/_basecache.py b/buildstream/_basecache.py new file mode 100644 index 000000000..a8c58e48f --- /dev/null +++ b/buildstream/_basecache.py @@ -0,0 +1,232 @@ +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> +# +from collections.abc import Mapping +import multiprocessing + +from . import utils +from . import _yaml +from ._cas import CASRemote +from ._message import Message, MessageType + + +# Base Cache for Caches to derive from +# +class BaseCache(): + + # None of these should ever be called in the base class, but this appeases + # pylint to some degree + spec_class = None + spec_name = None + spec_error = None + config_node_name = None + + def __init__(self, context): + self.context = context + self.cas = context.get_cascache() + self.casquota = context.get_casquota() + self.casquota._calculate_cache_quota() + + self._remotes_setup = False # Check to prevent double-setup of remotes + # Per-project list of _CASRemote instances. + self._remotes = {} + + self.global_remote_specs = [] + self.project_remote_specs = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + + # specs_from_config_node() + # + # Parses the configuration of remote artifact caches from a config block. + # + # Args: + # config_node (dict): The config block, which may contain the 'artifacts' key + # basedir (str): The base directory for relative paths + # + # Returns: + # A list of ArtifactCacheSpec instances. + # + # Raises: + # LoadError, if the config block contains invalid keys. + # + @classmethod + def specs_from_config_node(cls, config_node, basedir=None): + cache_specs = [] + + artifacts = config_node.get(cls.config_node_name, []) + if isinstance(artifacts, Mapping): + # pylint: disable=not-callable + cache_specs.append(cls.spec_class._new_from_config_node(artifacts, basedir)) + elif isinstance(artifacts, list): + for spec_node in artifacts: + cache_specs.append(cls.spec_class._new_from_config_node(spec_node, basedir)) + else: + provenance = _yaml.node_get_provenance(config_node, key=cls.config_node_name) + raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA, + "%s: 'artifacts' must be a single 'url:' mapping, or a list of mappings" % + (str(provenance))) + return cache_specs + + # _configured_remote_cache_specs(): + # + # Return the list of configured remotes for a given project, in priority + # order. This takes into account the user and project configuration. + # + # Args: + # context (Context): The BuildStream context + # project (Project): The BuildStream project + # + # Returns: + # A list of ArtifactCacheSpec instances describing the remote artifact caches. + # + @classmethod + def _configured_remote_cache_specs(cls, context, project): + project_overrides = context.get_overrides(project.name) + project_extra_specs = cls.specs_from_config_node(project_overrides) + + project_specs = getattr(project, cls.spec_name) + context_specs = getattr(context, cls.spec_name) + + return list(utils._deduplicate( + project_extra_specs + project_specs + context_specs)) + + # setup_remotes(): + # + # Sets up which remotes to use + # + # Args: + # use_config (bool): Whether to use project configuration + # remote_url (str): Remote cache URL + # + # This requires that all of the projects which are to be processed in the session + # have already been loaded and are observable in the Context. + # + def setup_remotes(self, *, use_config=False, remote_url=None): + + # Ensure we do not double-initialise since this can be expensive + assert not self._remotes_setup + self._remotes_setup = True + + # Initialize remote caches. We allow the commandline to override + # the user config in some cases (for example `bst artifact push --remote=...`). + has_remote_caches = False + if remote_url: + # pylint: disable=not-callable + self._set_remotes([self.spec_class(remote_url, push=True)]) + has_remote_caches = True + if use_config: + for project in self.context.get_projects(): + caches = self._configured_remote_cache_specs(self.context, project) + if caches: # caches is a list of spec_class instances + self._set_remotes(caches, project=project) + has_remote_caches = True + if has_remote_caches: + self._initialize_remotes() + + # initialize_remotes(): + # + # This will contact each remote cache. + # + # Args: + # on_failure (callable): Called if we fail to contact one of the caches. + # + def initialize_remotes(self, *, on_failure=None): + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + + error = CASRemote.check_remote(remote_spec, q) + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise self.spec_error(error) # pylint: disable=not-callable + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes + + ################################################ + # Local Private Methods # + ################################################ + + # _message() + # + # Local message propagator + # + def _message(self, message_type, message, **kwargs): + args = dict(kwargs) + self.context.message( + Message(None, message_type, message, **args)) + + # _set_remotes(): + # + # Set the list of remote caches. If project is None, the global list of + # remote caches will be set, which is used by all projects. If a project is + # specified, the per-project list of remote caches will be set. + # + # Args: + # remote_specs (list): List of ArtifactCacheSpec instances, in priority order. + # project (Project): The Project instance for project-specific remotes + def _set_remotes(self, remote_specs, *, project=None): + if project is None: + # global remotes + self.global_remote_specs = remote_specs + else: + self.project_remote_specs[project] = remote_specs + + # _initialize_remotes() + # + # An internal wrapper which calls the abstract method and + # reports takes care of messaging + # + def _initialize_remotes(self): + def remote_failed(url, error): + self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error)) + + with self.context.timed_activity("Initializing remote caches", silent_nested=True): + self.initialize_remotes(on_failure=remote_failed) diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py index 02030bb68..04a09299a 100644 --- a/buildstream/_cas/cascache.py +++ b/buildstream/_cas/cascache.py @@ -1041,6 +1041,7 @@ class CASCache(): class CASQuota: def __init__(self, context): + self.context = context self.cas = context.get_cascache() self.casdir = self.cas.casdir self._config_cache_quota = context.config_cache_quota @@ -1054,6 +1055,9 @@ class CASQuota: self._message = context.message + self._ref_callbacks = [] # Call backs to get required refs + self._remove_callbacks = [] # Call backs to remove refs + self._calculate_cache_quota() # compute_cache_size() @@ -1283,6 +1287,138 @@ class CASQuota: self._cache_quota = cache_quota - self._cache_quota_headroom self._cache_lower_threshold = self._cache_quota / 2 + # clean(): + # + # Clean the artifact cache as much as possible. + # + # Args: + # progress (callable): A callback to call when a ref is removed + # + # Returns: + # (int): The size of the cache after having cleaned up + # + def clean(self, progress=None): + context = self.context + + # Some accumulative statistics + removed_ref_count = 0 + space_saved = 0 + + # get required refs + refs = self.cas.list_refs() + required_refs = set(itertools.chain.from_iterable(self._ref_callbacks)) + + # Start off with an announcement with as much info as possible + volume_size, volume_avail = self._get_cache_volume_size() + self._message(Message( + None, MessageType.STATUS, "Starting cache cleanup", + detail=("Elements required by the current build plan: {}\n" + + "User specified quota: {} ({})\n" + + "Cache usage: {}\n" + + "Cache volume: {} total, {} available") + .format(len(required_refs), + context.config_cache_quota, + utils._pretty_size(self._cache_quota, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + utils._pretty_size(volume_size, dec_places=2), + utils._pretty_size(volume_avail, dec_places=2)))) + + # Do a real computation of the cache size once, just in case + self.compute_cache_size() + usage = CASCacheUsage(self) + self._message(Message(None, MessageType.STATUS, + "Cache usage recomputed: {}".format(usage))) + + while self.get_cache_size() >= self._cache_lower_threshold: + try: + to_remove = refs.pop(0) + except IndexError: + # If too many artifacts are required, and we therefore + # can't remove them, we have to abort the build. + # + # FIXME: Asking the user what to do may be neater + # + default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], + 'buildstream.conf') + detail = ("Aborted after removing {} refs and saving {} disk space.\n" + "The remaining {} in the cache is required by the {} references in your build plan\n\n" + "There is not enough space to complete the build.\n" + "Please increase the cache-quota in {} and/or make more disk space." + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + len(required_refs), + (context.config_origin or default_conf))) + + if self.full(): + raise CASCacheError("Cache too full. Aborting.", + detail=detail, + reason="cache-too-full") + else: + break + + key = to_remove.rpartition('/')[2] + if key not in required_refs: + + # Remove the actual artifact, if it's not required. + size = 0 + removed_ref = False + for (pred, remove) in self._remove_callbacks: + if pred(to_remove): + size = remove(to_remove) + removed_ref = True + break + + if not removed_ref: + continue + + removed_ref_count += 1 + space_saved += size + + self._message(Message( + None, MessageType.STATUS, + "Freed {: <7} {}".format( + utils._pretty_size(size, dec_places=2), + to_remove))) + + # Remove the size from the removed size + self.set_cache_size(self._cache_size - size) + + # User callback + # + # Currently this process is fairly slow, but we should + # think about throttling this progress() callback if this + # becomes too intense. + if progress: + progress() + + # Informational message about the side effects of the cleanup + self._message(Message( + None, MessageType.INFO, "Cleanup completed", + detail=("Removed {} refs and saving {} disk space.\n" + + "Cache usage is now: {}") + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2)))) + + return self.get_cache_size() + + # add_ref_callbacks() + # + # Args: + # callback (Iterator): function that gives list of required refs + def add_ref_callbacks(self, callback): + self._ref_callbacks.append(callback) + + # add_remove_callbacks() + # + # Args: + # callback (predicate, callback): The predicate says whether this is the + # correct type to remove given a ref and the callback does actual + # removing. + def add_remove_callbacks(self, callback): + self._remove_callbacks.append(callback) + def _grouper(iterable, n): while True: diff --git a/buildstream/_context.py b/buildstream/_context.py index 8a9f485be..286e2d223 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -32,6 +32,7 @@ from ._exceptions import LoadError, LoadErrorReason, BstError from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end from ._artifactcache import ArtifactCache +from ._sourcecache import SourceCache from ._cas import CASCache, CASQuota, CASCacheUsage from ._workspaces import Workspaces, WorkspaceProjectCache from .plugin import _plugin_lookup @@ -65,6 +66,9 @@ class Context(): # The directory where various sources are stored self.sourcedir = None + # specs for source cache remotes + self.source_cache_specs = None + # The directory where build sandboxes will be created self.builddir = None @@ -145,6 +149,7 @@ class Context(): self._message_handler = None self._message_depth = deque() self._artifactcache = None + self._sourcecache = None self._projects = [] self._project_overrides = {} self._workspaces = None @@ -162,6 +167,7 @@ class Context(): # Args: # config (filename): The user specified configuration file, if any # + # Raises: # LoadError # @@ -201,7 +207,7 @@ class Context(): _yaml.node_validate(defaults, [ 'cachedir', 'sourcedir', 'builddir', 'logdir', 'scheduler', - 'artifacts', 'logging', 'projects', 'cache', 'prompt', + 'artifacts', 'source-caches', 'logging', 'projects', 'cache', 'prompt', 'workspacedir', 'remote-execution', ]) @@ -253,6 +259,9 @@ class Context(): # Load artifact share configuration self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults) + # Load source cache config + self.source_cache_specs = SourceCache.specs_from_config_node(defaults) + self.remote_execution_specs = SandboxRemote.specs_from_config_node(defaults) # Load pull build trees configuration @@ -296,8 +305,10 @@ class Context(): # Shallow validation of overrides, parts of buildstream which rely # on the overrides are expected to validate elsewhere. for _, overrides in _yaml.node_items(self._project_overrides): - _yaml.node_validate(overrides, ['artifacts', 'options', 'strict', 'default-mirror', - 'remote-execution']) + _yaml.node_validate(overrides, + ['artifacts', 'source-caches', 'options', + 'strict', 'default-mirror', + 'remote-execution']) profile_end(Topics.LOAD_CONTEXT, 'load') @@ -318,6 +329,13 @@ class Context(): def get_cache_usage(self): return CASCacheUsage(self.get_casquota()) + @property + def sourcecache(self): + if not self._sourcecache: + self._sourcecache = SourceCache(self) + + return self._sourcecache + # add_project(): # # Add a project to the context. diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index 8728f6e69..f2d34bcba 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -272,6 +272,15 @@ class SandboxError(BstError): super().__init__(message, detail=detail, domain=ErrorDomain.SANDBOX, reason=reason) +# SourceCacheError +# +# Raised when errors are encountered in the source caches +# +class SourceCacheError(BstError): + def __init__(self, message, detail=None, reason=None): + super().__init__(message, detail=detail, domain=ErrorDomain.SANDBOX, reason=reason) + + # ArtifactError # # Raised when errors are encountered in the artifact caches diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index 398bd85fc..5681fd103 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -137,7 +137,7 @@ def complete_artifact(orig_args, args, incomplete): # element targets are valid artifact names complete_list = complete_target(args, incomplete) - complete_list.extend(ref for ref in ctx.artifactcache.cas.list_refs() if ref.startswith(incomplete)) + complete_list.extend(ref for ref in ctx.artifactcache.list_artifacts() if ref.startswith(incomplete)) return complete_list diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index d1df06284..15bd9cf79 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -382,7 +382,7 @@ class LogLine(Widget): line = p.fmt_subst(line, 'state', "failed", fg='red') elif element._cached_success(): line = p.fmt_subst(line, 'state', "cached", fg='magenta') - elif consistency == Consistency.RESOLVED: + elif consistency == Consistency.RESOLVED and not element._source_cached(): line = p.fmt_subst(line, 'state', "fetch needed", fg='red') elif element._buildable(): line = p.fmt_subst(line, 'state', "buildable", fg='green') diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index 1607c5b5e..9b91e91fe 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -523,28 +523,29 @@ class Loader(): element._preflight() sources = list(element.sources()) - for idx, source in enumerate(sources): - # Handle the case where a subproject needs to be fetched - # - if source.get_consistency() == Consistency.RESOLVED: - if fetch_subprojects: - if ticker: - ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind())) - source._fetch(sources[0:idx]) - else: - detail = "Try fetching the project with `bst source fetch {}`".format(filename) - raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED, - "Subproject fetch needed for junction: {}".format(filename), + if not element._source_cached(): + for idx, source in enumerate(sources): + # Handle the case where a subproject needs to be fetched + # + if source.get_consistency() == Consistency.RESOLVED: + if fetch_subprojects: + if ticker: + ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind())) + source._fetch(sources[0:idx]) + else: + detail = "Try fetching the project with `bst source fetch {}`".format(filename) + raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED, + "Subproject fetch needed for junction: {}".format(filename), + detail=detail) + + # Handle the case where a subproject has no ref + # + elif source.get_consistency() == Consistency.INCONSISTENT: + detail = "Try tracking the junction element with `bst source track {}`".format(filename) + raise LoadError(LoadErrorReason.SUBPROJECT_INCONSISTENT, + "Subproject has no ref for junction: {}".format(filename), detail=detail) - # Handle the case where a subproject has no ref - # - elif source.get_consistency() == Consistency.INCONSISTENT: - detail = "Try tracking the junction element with `bst source track {}`".format(filename) - raise LoadError(LoadErrorReason.SUBPROJECT_INCONSISTENT, - "Subproject has no ref for junction: {}".format(filename), - detail=detail) - workspace = element._get_workspace() if workspace: # If a workspace is open, load it from there instead diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 7cf36f5a7..004776293 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -395,7 +395,8 @@ class Pipeline(): uncached = [] with self._context.timed_activity("Checking sources"): for element in elements: - if element._get_consistency() != Consistency.CACHED: + if element._get_consistency() < Consistency.CACHED and \ + not element._source_cached(): uncached.append(element) if uncached: @@ -403,7 +404,7 @@ class Pipeline(): for element in uncached: detail += " Following sources for element: {} are not cached:\n".format(element._get_full_name()) for source in element.sources(): - if source._get_consistency() != Consistency.CACHED: + if source._get_consistency() < Consistency.CACHED: detail += " {}\n".format(source) detail += '\n' detail += "Try fetching these elements first with `bst source fetch`,\n" + \ diff --git a/buildstream/_project.py b/buildstream/_project.py index 6cbba497f..21ea91683 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -34,6 +34,7 @@ from ._profile import Topics, profile_start, profile_end from ._exceptions import LoadError, LoadErrorReason from ._options import OptionPool from ._artifactcache import ArtifactCache +from ._sourcecache import SourceCache from .sandbox import SandboxRemote from ._elementfactory import ElementFactory from ._sourcefactory import SourceFactory @@ -140,6 +141,7 @@ class Project(): self._shell_host_files = [] # A list of HostMount objects self.artifact_cache_specs = None + self.source_cache_specs = None self.remote_execution_specs = None self._sandbox = None self._splits = None @@ -333,7 +335,7 @@ class Project(): 'artifacts', 'options', 'fail-on-overlap', 'shell', 'fatal-warnings', 'ref-storage', 'sandbox', 'mirrors', 'remote-execution', - 'sources', '(@)' + 'sources', 'source-caches', '(@)' ]) # create_element() @@ -672,6 +674,9 @@ class Project(): parent = self.junction._get_project() self.artifact_cache_specs = parent.artifact_cache_specs + self.artifact_cache_specs + # Load source caches with pull/push config + self.source_cache_specs = SourceCache.specs_from_config_node(config, self.directory) + # Load remote-execution configuration for this project project_specs = SandboxRemote.specs_from_config_node(config, self.directory) override_specs = SandboxRemote.specs_from_config_node( diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index e016d4cd7..9610d53f8 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -32,7 +32,7 @@ class CleanupJob(Job): def progress(): self.send_message('update-cache-size', self._casquota.get_cache_size()) - return self._artifacts.clean(progress) + return self._casquota.clean(progress) def handle_message(self, message_type, message): # Update the cache size in the main process as we go, diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index fc11fd1d1..db5e470f9 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -35,13 +35,14 @@ class FetchQueue(Queue): complete_name = "Fetched" resources = [ResourceType.DOWNLOAD] - def __init__(self, scheduler, skip_cached=False): + def __init__(self, scheduler, skip_cached=False, fetch_original=False): super().__init__(scheduler) self._skip_cached = skip_cached + self._fetch_original = fetch_original def process(self, element): - element._fetch() + element._fetch(fetch_original=self._fetch_original) def status(self, element): # state of dependencies may have changed, recalculate element state @@ -62,7 +63,8 @@ class FetchQueue(Queue): # This will automatically skip elements which # have no sources. - if element._get_consistency() == Consistency.CACHED: + + if not element._should_fetch(self._fetch_original): return QueueStatus.SKIP return QueueStatus.READY diff --git a/buildstream/_sourcecache.py b/buildstream/_sourcecache.py new file mode 100644 index 000000000..b21edaa81 --- /dev/null +++ b/buildstream/_sourcecache.py @@ -0,0 +1,145 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> +# +from ._cas import CASRemoteSpec +from .storage._casbaseddirectory import CasBasedDirectory +from ._basecache import BaseCache +from ._exceptions import CASCacheError, SourceCacheError +from . import utils + + +# Holds configuration for a remote used for the source cache. +# +# Args: +# url (str): Location of the remote source cache +# push (bool): Whether we should attempt to push sources to this cache, +# in addition to pulling from it. +# instance-name (str): Name if any, of instance of server +# +class SourceCacheSpec(CASRemoteSpec): + pass + + +# Class that keeps config of remotes and deals with caching of sources. +# +# Args: +# context (Context): The Buildstream context +# +class SourceCache(BaseCache): + + spec_class = SourceCacheSpec + spec_name = "source_cache_specs" + spec_error = SourceCacheError + config_node_name = "source-caches" + + def __init__(self, context): + super().__init__(context) + + self._required_sources = set() + + self.casquota.add_ref_callbacks(self.required_sources()) + self.casquota.add_remove_callbacks((lambda x: x.startswith('@sources/'), self.cas.remove)) + + # mark_required_sources() + # + # Mark sources that are required by the current run. + # + # Sources that are in this list will not be removed during the current + # pipeline. + # + # Args: + # sources (iterable): An iterable over sources that are required + # + def mark_required_sources(self, sources): + sources = list(sources) # in case it's a generator + + self._required_sources.update(sources) + + # update mtimes just in case + for source in sources: + ref = source._get_source_name() + try: + self.cas.update_mtime(ref) + except CASCacheError: + pass + + # required_sources() + # + # Yields the keys of all sources marked as required + # + # Returns: + # iterable (str): iterable over the source keys + # + def required_sources(self): + for source in self._required_sources: + yield source._key + + # contains() + # + # Given a source, gets the ref name and checks whether the local CAS + # contains it. + # + # Args: + # source (Source): Source to check + # + # Returns: + # (bool): whether the CAS contains this source or not + # + def contains(self, source): + ref = source._get_source_name() + return self.cas.contains(ref) + + # commit() + # + # Given a source along with previous sources, it stages and commits these + # to the local CAS. This is done due to some types of sources being + # dependent on previous sources, such as the patch source. + # + # Args: + # source: last source + # previous_sources: rest of the sources. + def commit(self, source, previous_sources): + ref = source._get_source_name() + + # Use tmpdir for now + with utils._tempdir(dir=self.context.tmpdir, prefix='staging-temp') as tmpdir: + for previous_source in previous_sources: + previous_source._stage(tmpdir) + source._stage(tmpdir) + + self.cas.commit([ref], tmpdir) + + # export() + # + # Exports a source in the CAS to a virtual directory + # + # Args: + # source (Source): source we want to export + # + # Returns: + # CASBasedDirectory + def export(self, source): + ref = source._get_source_name() + + try: + digest = self.cas.resolve_ref(ref) + except CASCacheError as e: + raise SourceCacheError("Error exporting source: {}".format(e)) + + return CasBasedDirectory(self.cas, digest=digest) diff --git a/buildstream/_stream.py b/buildstream/_stream.py index 5c880427c..f1600a8e9 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -19,6 +19,8 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # Tristan Maat <tristan.maat@codethink.co.uk> +import itertools +import functools import os import sys import stat @@ -587,9 +589,9 @@ class Stream(): except_targets=except_targets, fetch_subprojects=True) - # Assert all sources are cached + # Assert all sources are cached in the source dir if fetch: - self._fetch(elements) + self._fetch(elements, fetch_original=True) self._pipeline.assert_sources_cached(elements) # Stage all sources determined by scope @@ -636,7 +638,7 @@ class Stream(): track_elements = [] if track_first: track_elements = elements - self._fetch(elements, track_elements=track_elements) + self._fetch(elements, track_elements=track_elements, fetch_original=True) expanded_directories = [] # To try to be more atomic, loop through the elements and raise any errors we can early @@ -656,7 +658,9 @@ class Stream(): raise StreamError("Element '{}' already has workspace defined at: {}" .format(target.name, workspace.get_absolute_path())) - if not no_checkout and target._get_consistency() != Consistency.CACHED: + target_consistency = target._get_consistency() + if not no_checkout and target_consistency < Consistency.CACHED and \ + target_consistency._source_cached(): raise StreamError("Could not stage uncached source. For {} ".format(target.name) + "Use `--track` to track and " + "fetch the latest version of the " + @@ -771,7 +775,7 @@ class Stream(): # Do the tracking first if track_first: - self._fetch(elements, track_elements=track_elements) + self._fetch(elements, track_elements=track_elements, fetch_original=True) workspaces = self._context.get_workspaces() @@ -1090,7 +1094,13 @@ class Stream(): # It must include all the artifacts which are required by the # final product. Note that this is a superset of the build plan. # - self._artifacts.mark_required_elements(self._pipeline.dependencies(elements, Scope.ALL)) + # use partial as we send this to both Artifact and Source caches + required_elements = functools.partial(self._pipeline.dependencies, elements, Scope.ALL) + self._artifacts.mark_required_elements(required_elements()) + + self._context.sourcecache.mark_required_sources( + itertools.chain.from_iterable( + [element.sources() for element in required_elements()])) if selection == PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, @@ -1181,8 +1191,9 @@ class Stream(): # Args: # elements (list of Element): Elements to fetch # track_elements (list of Element): Elements to track + # fetch_original (Bool): Whether to fetch original unstaged # - def _fetch(self, elements, *, track_elements=None): + def _fetch(self, elements, *, track_elements=None, fetch_original=False): if track_elements is None: track_elements = [] @@ -1195,7 +1206,8 @@ class Stream(): # Filter out elements with cached sources, only from the fetch plan # let the track plan resolve new refs. - cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] + cached = [elt for elt in fetch_plan + if not elt._should_fetch(fetch_original)] fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) # Construct queues, enqueue and run @@ -1204,7 +1216,7 @@ class Stream(): if track_elements: track_queue = TrackQueue(self._scheduler) self._add_queue(track_queue, track=True) - self._add_queue(FetchQueue(self._scheduler)) + self._add_queue(FetchQueue(self._scheduler, fetch_original=fetch_original)) if track_elements: self._enqueue_plan(track_elements, queue=track_queue) diff --git a/buildstream/element.py b/buildstream/element.py index b9643aee9..901a9507f 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -88,7 +88,7 @@ from . import _yaml from ._variables import Variables from ._versions import BST_CORE_ARTIFACT_VERSION from ._exceptions import BstError, LoadError, LoadErrorReason, ImplError, \ - ErrorDomain + ErrorDomain, SourceCacheError from .utils import UtilError from . import Plugin, Consistency, Scope from . import SandboxFlags, SandboxCommandError @@ -956,11 +956,16 @@ class Element(Plugin): element = meta.project.create_element(meta, first_pass=meta.first_pass) cls.__instantiated_elements[meta] = element - # Instantiate sources + # Instantiate sources and generate their keys + previous_sources = [] for meta_source in meta.sources: meta_source.first_pass = meta.kind == "junction" source = meta.project.create_source(meta_source, first_pass=meta.first_pass) + + source._generate_key(previous_sources) + previous_sources.append(source) + redundant_ref = source._load_ref() element.__sources.append(source) @@ -1080,7 +1085,8 @@ class Element(Plugin): # (bool): Whether this element can currently be built # def _buildable(self): - if self._get_consistency() != Consistency.CACHED: + if self._get_consistency() < Consistency.CACHED and \ + not self._source_cached(): return False for dependency in self.dependencies(Scope.BUILD): @@ -1363,6 +1369,12 @@ class Element(Plugin): self.__tracking_scheduled = False self.__tracking_done = True + # update keys + sources = list(self.sources()) + if sources: + source = sources.pop() + source._generate_key(sources) + self._update_state() # _track(): @@ -1457,6 +1469,7 @@ class Element(Plugin): # usebuildtree (bool): use a the elements build tree as its source. # def _stage_sources_at(self, vdirectory, mount_workspaces=True, usebuildtree=False): + context = self._get_context() # It's advantageous to have this temporary directory on @@ -1486,12 +1499,23 @@ class Element(Plugin): if import_dir.is_empty(): detail = "Element type either does not expect a buildtree or it was explictily cached without one." self.warn("WARNING: {} Artifact contains an empty buildtree".format(self.name), detail=detail) + + # No workspace or cached buildtree, stage source from source cache else: - # No workspace or cached buildtree, stage source directly - for source in self.sources(): - source._stage(import_dir) + # Ensure sources are cached + self.__cache_sources() + + if list(self.sources()): - vdirectory.import_files(import_dir) + sourcecache = self._get_context().sourcecache + try: + import_dir = sourcecache.export(list(self.sources())[-1]) + except SourceCacheError as e: + raise ElementError("Error trying to export source for {}: {}" + .format(self.name, e)) + + with utils._deterministic_umask(): + vdirectory.import_files(import_dir) # Ensure deterministic mtime of sources at build time vdirectory.set_deterministic_mtime() @@ -1945,8 +1969,12 @@ class Element(Plugin): os.makedirs(context.builddir, exist_ok=True) with utils._tempdir(dir=context.builddir, prefix='workspace-{}' .format(self.normal_name)) as temp: + last_source = None for source in self.sources(): - source._init_workspace(temp) + last_source = source + + if last_source: + last_source._init_workspace(temp) # Now hardlink the files into the workspace target. utils.link_files(temp, workspace.get_absolute_path()) @@ -2037,13 +2065,26 @@ class Element(Plugin): # Raises: # SourceError: If one of the element sources has an error # - def _fetch(self): + def _fetch(self, fetch_original=False): previous_sources = [] + source = None + sourcecache = self._get_context().sourcecache + + # check whether the final source is cached + for source in self.sources(): + pass + + if source and not fetch_original and sourcecache.contains(source): + return + for source in self.sources(): - if source._get_consistency() < Consistency.CACHED: + source_consistency = source._get_consistency() + if source_consistency != Consistency.CACHED: source._fetch(previous_sources) previous_sources.append(source) + self.__cache_sources() + # _calculate_cache_key(): # # Calculates the cache key @@ -2092,6 +2133,27 @@ class Element(Plugin): return _cachekey.generate_key(cache_key_dict) + def _source_cached(self): + source = None + for source in self.sources(): + pass + if source: + return self._get_context().sourcecache.contains(source) + else: + return True + + def _should_fetch(self, fetch_original=False): + """ return bool of if we need to run the fetch stage for this element + + Args: + fetch_original (bool): whether we need to original unstaged source + """ + if (self._get_consistency() == Consistency.CACHED and fetch_original) or \ + (self._source_cached() and not fetch_original): + return False + else: + return True + ############################################################# # Private Local Methods # ############################################################# @@ -2123,8 +2185,7 @@ class Element(Plugin): # Determine overall consistency of the element for source in self.__sources: source._update_state() - source_consistency = source._get_consistency() - self.__consistency = min(self.__consistency, source_consistency) + self.__consistency = min(self.__consistency, source._get_consistency()) # __can_build_incrementally() # @@ -2831,6 +2892,17 @@ class Element(Plugin): return (subdir, excluded_subdirs) + # __cache_sources(): + # + # Caches the sources into the local CAS + # + def __cache_sources(self): + sources = list(self.sources()) + if sources: + sourcecache = self._get_context().sourcecache + if not sourcecache.contains(sources[-1]): + sources[-1]._cache(sources[:-1]) + def _overlap_error_detail(f, forbidden_overlap_elements, elements): if forbidden_overlap_elements: diff --git a/buildstream/source.py b/buildstream/source.py index 97995a8da..b5c38335b 100644 --- a/buildstream/source.py +++ b/buildstream/source.py @@ -170,6 +170,7 @@ from . import _yaml, utils from ._exceptions import BstError, ImplError, ErrorDomain from ._loader.metasource import MetaSource from ._projectrefs import ProjectRefStorage +from ._cachekey import generate_key class SourceError(BstError): @@ -289,12 +290,16 @@ class Source(Plugin): super().__init__("{}-{}".format(meta.element_name, meta.element_index), context, project, provenance, "source") + self.__source_cache = context.sourcecache + self.__element_name = meta.element_name # The name of the element owning this source self.__element_index = meta.element_index # The index of the source in the owning element's source list self.__element_kind = meta.element_kind # The kind of the element owning this source self.__directory = meta.directory # Staging relative directory self.__consistency = Consistency.INCONSISTENT # Cached consistency state + self.__key = None # Cache key for source + # The alias_override is only set on a re-instantiated Source self.__alias_override = alias_override # Tuple of alias and its override to use instead self.__expected_alias = None # The primary alias @@ -688,6 +693,7 @@ class Source(Plugin): # # Args: # previous_sources (list): List of Sources listed prior to this source + # fetch_original (bool): whether to fetch full source, or use local CAS # def _fetch(self, previous_sources): @@ -700,6 +706,10 @@ class Source(Plugin): else: self.__do_fetch() + def _cache(self, previous_sources): + # stage the source into the source cache + self.__source_cache.commit(self, previous_sources) + # Wrapper for stage() api which gives the source # plugin a fully constructed path considering the # 'directory' option @@ -956,6 +966,26 @@ class Source(Plugin): else: return None + def _generate_key(self, previous_sources): + keys = [self._get_unique_key(True)] + + for previous_source in previous_sources: + keys.append(previous_source._get_unique_key(True)) + + self.__key = generate_key(keys) + + @property + def _key(self): + return self.__key + + # Gives a ref path that points to where sources are kept in the CAS + def _get_source_name(self): + # @ is used to prevent conflicts with project names + return "{}/{}/{}".format( + '@sources', + self.get_kind(), + self._key) + ############################################################# # Local Private Methods # ############################################################# diff --git a/buildstream/types.py b/buildstream/types.py index 23d78b08c..ba4b99eb7 100644 --- a/buildstream/types.py +++ b/buildstream/types.py @@ -76,8 +76,7 @@ class Consistency(): CACHED = 2 """Cached - Cached sources have a reference which is present in the local - source cache. Only cached sources can be staged. + Sources have a cached unstaged copy in the source directory. """ diff --git a/buildstream/utils.py b/buildstream/utils.py index 2960348e9..f4a329210 100644 --- a/buildstream/utils.py +++ b/buildstream/utils.py @@ -1307,3 +1307,18 @@ def _with_gc_disabled(func): # used by other objects during the course of running BuildStream. gc.collect() return _gc_disabled + + +# _deterministic_umask() +# +# Context managed to apply a umask to a section that may be affected by a users +# umask. Restores old mask afterwards. +# +@contextmanager +def _deterministic_umask(): + old_umask = os.umask(0o022) + + try: + yield + finally: + os.umask(old_umask) diff --git a/tests/artifactcache/config.py b/tests/artifactcache/config.py index fda309725..c3b09bfb1 100644 --- a/tests/artifactcache/config.py +++ b/tests/artifactcache/config.py @@ -3,7 +3,7 @@ import pytest import itertools import os -from buildstream._artifactcache import ArtifactCacheSpec, _configured_remote_artifact_cache_specs +from buildstream._artifactcache import ArtifactCacheSpec, ArtifactCache from buildstream._context import Context from buildstream._project import Project from buildstream.utils import _deduplicate @@ -104,7 +104,7 @@ def test_artifact_cache_precedence(tmpdir, override_caches, project_caches, user project.ensure_fully_loaded() # Use the helper from the artifactcache module to parse our configuration. - parsed_cache_specs = _configured_remote_artifact_cache_specs(context, project) + parsed_cache_specs = ArtifactCache._configured_remote_cache_specs(context, project) # Verify that it was correctly read. expected_cache_specs = list(_deduplicate(itertools.chain(override_caches, project_caches, user_caches))) diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py index 20e8cbda2..e39357534 100644 --- a/tests/artifactcache/expiry.py +++ b/tests/artifactcache/expiry.py @@ -94,7 +94,7 @@ def test_artifact_too_large(cli, datafiles, size): create_element_size('target.bst', project, element_path, [], size) res = cli.run(project=project, args=['build', 'target.bst']) res.assert_main_error(ErrorDomain.STREAM, None) - res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full') + res.assert_task_error(ErrorDomain.CAS, 'cache-too-full') @pytest.mark.datafiles(DATA_DIR) @@ -224,7 +224,7 @@ def test_never_delete_required(cli, datafiles): # cache. Since all elements are required, the build should fail. res = cli.run(project=project, args=['build', 'target.bst']) res.assert_main_error(ErrorDomain.STREAM, None) - res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full') + res.assert_task_error(ErrorDomain.CAS, 'cache-too-full') # Only the first artifact fits in the cache, but we expect # that the first *two* artifacts will be cached. @@ -295,13 +295,15 @@ def test_never_delete_required_track(cli, datafiles): # res = cli.run(project=project, args=['build', '--track-all', 'target.bst']) res.assert_main_error(ErrorDomain.STREAM, None) - res.assert_task_error(ErrorDomain.ARTIFACT, 'cache-too-full') + res.assert_task_error(ErrorDomain.CAS, 'cache-too-full') - # Expect the same result that we did in test_never_delete_required() + # Expect the almost the same result that we did in test_never_delete_required() + # As the source will be downloaded first, we will be over the limit once + # the source for dep2.bst is downloaded # states = cli.get_element_states(project, ['target.bst']) assert states['dep1.bst'] == 'cached' - assert states['dep2.bst'] == 'cached' + assert states['dep2.bst'] == 'buildable' assert states['dep3.bst'] != 'cached' assert states['target.bst'] != 'cached' diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py index 9c56fb4a0..9c3efadd8 100644 --- a/tests/frontend/buildtrack.py +++ b/tests/frontend/buildtrack.py @@ -125,6 +125,8 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, # Delete element sources source_dir = os.path.join(project, 'cache', 'sources') shutil.rmtree(source_dir) + source_refs = os.path.join(project, 'cache', 'cas', 'refs', 'heads', '@sources') + shutil.rmtree(source_refs) # Delete artifacts one by one and assert element states for target in set(tracked): diff --git a/tests/frontend/project/sources/fetch_source.py b/tests/frontend/project/sources/fetch_source.py index d454f69e0..06596607b 100644 --- a/tests/frontend/project/sources/fetch_source.py +++ b/tests/frontend/project/sources/fetch_source.py @@ -62,6 +62,9 @@ class FetchSource(Source): if not os.path.exists(output_dir): raise SourceError("Directory '{}' does not exist".format(output_dir)) + def stage(self, directory): + pass + def fetch(self): for fetcher in self.fetchers: fetcher.fetch() diff --git a/tests/internals/pluginloading/customsource/pluginsources/foo.py b/tests/internals/pluginloading/customsource/pluginsources/foo.py index d2b0d9c6d..8dd16801c 100644 --- a/tests/internals/pluginloading/customsource/pluginsources/foo.py +++ b/tests/internals/pluginloading/customsource/pluginsources/foo.py @@ -9,6 +9,9 @@ class FooSource(Source): def configure(self, node): pass + def get_unique_key(self): + pass + def get_consistency(self): return Consistency.INCONSISTENT diff --git a/tests/sourcecache/__init__.py b/tests/sourcecache/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/sourcecache/__init__.py diff --git a/tests/sourcecache/config.py b/tests/sourcecache/config.py new file mode 100644 index 000000000..b5581a7e2 --- /dev/null +++ b/tests/sourcecache/config.py @@ -0,0 +1,58 @@ +# +# Copyright (C) 2019 Bloomberg Finance L.P. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> +# +import os +import pytest + +from buildstream import _yaml +from buildstream._exceptions import ErrorDomain, LoadErrorReason + +from buildstream.plugintestutils.runcli import cli + +DATA_DIR = os.path.dirname(os.path.realpath(__file__)) + + +# Assert that if either the client key or client cert is specified +# without specifying its counterpart, we get a comprehensive LoadError +# instead of an unhandled exception. +@pytest.mark.datafiles(DATA_DIR) +@pytest.mark.parametrize('config_key, config_value', [ + ('client-cert', 'client.crt'), + ('client-key', 'client.key') +]) +def test_missing_certs(cli, datafiles, config_key, config_value): + project = os.path.join(datafiles.dirname, datafiles.basename, 'missing-certs') + + project_conf = { + 'name': 'test', + + 'source-caches': { + 'url': 'https://cache.example.com:12345', + 'push': 'true', + config_key: config_value + } + } + project_conf_file = os.path.join(project, 'project.conf') + _yaml.dump(project_conf, project_conf_file) + + # Use `pull` here to ensure we try to initialize the remotes, triggering the error + # + # This does not happen for a simple `bst show`. + result = cli.run(project=project, args=['source', 'fetch', 'element.bst']) + result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA) diff --git a/tests/sourcecache/missing-certs/certificates/client.crt b/tests/sourcecache/missing-certs/certificates/client.crt new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/sourcecache/missing-certs/certificates/client.crt diff --git a/tests/sourcecache/missing-certs/certificates/client.key b/tests/sourcecache/missing-certs/certificates/client.key new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/sourcecache/missing-certs/certificates/client.key diff --git a/tests/sourcecache/missing-certs/element.bst b/tests/sourcecache/missing-certs/element.bst new file mode 100644 index 000000000..3c29b4ea1 --- /dev/null +++ b/tests/sourcecache/missing-certs/element.bst @@ -0,0 +1 @@ +kind: autotools diff --git a/tests/sourcecache/project/elements/compose-all.bst b/tests/sourcecache/project/elements/compose-all.bst new file mode 100644 index 000000000..ba47081b3 --- /dev/null +++ b/tests/sourcecache/project/elements/compose-all.bst @@ -0,0 +1,12 @@ +kind: compose + +depends: +- filename: import-bin.bst + type: build +- filename: import-dev.bst + type: build + +config: + # Dont try running the sandbox, we dont have a + # runtime to run anything in this context. + integrate: False diff --git a/tests/sourcecache/project/elements/import-bin.bst b/tests/sourcecache/project/elements/import-bin.bst new file mode 100644 index 000000000..a847c0c23 --- /dev/null +++ b/tests/sourcecache/project/elements/import-bin.bst @@ -0,0 +1,4 @@ +kind: import +sources: +- kind: local + path: files/bin-files diff --git a/tests/sourcecache/project/elements/import-dev.bst b/tests/sourcecache/project/elements/import-dev.bst new file mode 100644 index 000000000..152a54667 --- /dev/null +++ b/tests/sourcecache/project/elements/import-dev.bst @@ -0,0 +1,4 @@ +kind: import +sources: +- kind: local + path: files/dev-files diff --git a/tests/sourcecache/project/elements/target.bst b/tests/sourcecache/project/elements/target.bst new file mode 100644 index 000000000..ba489f1e8 --- /dev/null +++ b/tests/sourcecache/project/elements/target.bst @@ -0,0 +1,9 @@ +kind: stack +description: | + + Main stack target for the bst build test + +depends: +- import-bin.bst +- import-dev.bst +- compose-all.bst diff --git a/tests/sourcecache/project/files/bin-files/usr/bin/hello b/tests/sourcecache/project/files/bin-files/usr/bin/hello new file mode 100755 index 000000000..f534a4083 --- /dev/null +++ b/tests/sourcecache/project/files/bin-files/usr/bin/hello @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "Hello !" diff --git a/tests/sourcecache/project/files/dev-files/usr/include/pony.h b/tests/sourcecache/project/files/dev-files/usr/include/pony.h new file mode 100644 index 000000000..40bd0c2e7 --- /dev/null +++ b/tests/sourcecache/project/files/dev-files/usr/include/pony.h @@ -0,0 +1,12 @@ +#ifndef __PONY_H__ +#define __PONY_H__ + +#define PONY_BEGIN "Once upon a time, there was a pony." +#define PONY_END "And they lived happily ever after, the end." + +#define MAKE_PONY(story) \ + PONY_BEGIN \ + story \ + PONY_END + +#endif /* __PONY_H__ */ diff --git a/tests/sourcecache/project/project.conf b/tests/sourcecache/project/project.conf new file mode 100644 index 000000000..854e38693 --- /dev/null +++ b/tests/sourcecache/project/project.conf @@ -0,0 +1,4 @@ +# Project config for frontend build test +name: test + +element-path: elements diff --git a/tests/sourcecache/source-checkout.py b/tests/sourcecache/source-checkout.py new file mode 100644 index 000000000..f526dc586 --- /dev/null +++ b/tests/sourcecache/source-checkout.py @@ -0,0 +1,74 @@ +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> +# +import os +import pytest +import shutil + +from buildstream._exceptions import ErrorDomain +from buildstream.plugintestutils.runcli import cli + +from tests.testutils.element_generators import create_element_size + +DATA_DIR = os.path.dirname(os.path.realpath(__file__)) + + +@pytest.mark.datafiles(DATA_DIR) +def test_source_checkout(tmpdir, datafiles, cli): + project_dir = os.path.join(str(tmpdir), 'project') + element_path = 'elements' + cache_dir = os.path.join(str(tmpdir), 'cache') + source_dir = os.path.join(cache_dir, 'sources') + + cli.configure({ + 'cachedir': cache_dir, + }) + target_dir = os.path.join(str(tmpdir), 'target') + + repo = create_element_size('target.bst', project_dir, element_path, [], 100000) + + # without fetch it should fail + res = cli.run(project=project_dir, args=['source', 'checkout', 'target.bst', target_dir]) + res.assert_main_error(ErrorDomain.PIPELINE, "uncached-sources") + + # fetch and check it works + res = cli.run(project=project_dir, + args=['source', 'checkout', '--fetch', 'target.bst', + target_dir]) + res.assert_success() + assert "Fetching from" in res.stderr + + # remove the directory and check source checkout works with sources only in + # the CAS + shutil.rmtree(repo.repo) + shutil.rmtree(target_dir) + shutil.rmtree(source_dir) + + res = cli.run(project=project_dir, + args=['source', 'checkout', 'target.bst', target_dir]) + res.assert_success() + assert "Fetching from" not in res.stderr + + # remove the CAS and check it doesn't work again + shutil.rmtree(target_dir) + shutil.rmtree(os.path.join(cache_dir, 'cas')) + + res = cli.run(project=project_dir, + args=['source', 'checkout', '--fetch', 'target.bst', target_dir]) + res.assert_task_error(ErrorDomain.PLUGIN, None) diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py new file mode 100644 index 000000000..b62bc3c2f --- /dev/null +++ b/tests/sourcecache/staging.py @@ -0,0 +1,189 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> +# +import os +import shutil +import pytest + +from buildstream._context import Context +from buildstream._project import Project + +from buildstream.plugintestutils.runcli import cli +from tests.testutils.element_generators import create_element_size + + +DATA_DIR = os.path.dirname(os.path.realpath(__file__)) + + +def dummy_message_handler(message, context): + pass + + +# walk that removes the root directory from roots +def relative_walk(rootdir): + for root, dirnames, filenames in os.walk(rootdir): + relative_root = root.split(rootdir)[1] + yield (relative_root, dirnames, filenames) + + +@pytest.mark.datafiles(DATA_DIR) +def test_source_staged(tmpdir, cli, datafiles): + project_dir = os.path.join(datafiles.dirname, datafiles.basename, 'project') + cachedir = os.path.join(str(tmpdir), 'cache') + + cli.configure({ + 'cachedir': cachedir + }) + + # set up minimal context + context = Context() + context.load() + + # load project and sourcecache + project = Project(project_dir, context) + project.ensure_fully_loaded() + context.cachedir = cachedir + context.set_message_handler(dummy_message_handler) + sourcecache = context.sourcecache + cas = context.get_cascache() + + res = cli.run(project=project_dir, args=["build", "import-bin.bst"]) + res.assert_success() + + # now check that the source is in the refs file, this is pretty messy but + # seems to be the only way to get the sources? + element = project.load_elements(["import-bin.bst"])[0] + source = list(element.sources())[0] + assert sourcecache.contains(source) + assert element._source_cached() + + # Extract the file and check it's the same as the one we imported + ref = source._get_source_name() + digest = cas.resolve_ref(ref) + extractdir = os.path.join(str(tmpdir), "extract") + cas.checkout(extractdir, digest) + dir1 = extractdir + dir2 = os.path.join(project_dir, "files", "bin-files") + + assert list(relative_walk(dir1)) == list(relative_walk(dir2)) + + +# Check sources are staged during a fetch +@pytest.mark.datafiles(DATA_DIR) +def test_source_fetch(tmpdir, cli, datafiles): + project_dir = os.path.join(datafiles.dirname, datafiles.basename, 'project') + cachedir = os.path.join(str(tmpdir), 'cache') + + cli.configure({ + 'cachedir': cachedir + }) + + # set up minimal context + context = Context() + context.load() + + # load project and sourcecache + project = Project(project_dir, context) + project.ensure_fully_loaded() + context.cachedir = cachedir + context.set_message_handler(dummy_message_handler) + cas = context.get_cascache() + + res = cli.run(project=project_dir, args=["source", "fetch", "import-dev.bst"]) + res.assert_success() + + element = project.load_elements(["import-dev.bst"])[0] + source = list(element.sources())[0] + assert element._source_cached() + + # check that the directory structures are idetical + ref = source._get_source_name() + digest = cas.resolve_ref(ref) + extractdir = os.path.join(str(tmpdir), "extract") + cas.checkout(extractdir, digest) + dir1 = extractdir + dir2 = os.path.join(project_dir, "files", "dev-files") + + assert list(relative_walk(dir1)) == list(relative_walk(dir2)) + + +# Check that with sources only in the CAS build successfully completes +@pytest.mark.datafiles(DATA_DIR) +def test_staged_source_build(tmpdir, datafiles, cli): + project_dir = os.path.join(datafiles.dirname, datafiles.basename, 'project') + cachedir = os.path.join(str(tmpdir), 'cache') + element_path = 'elements' + source_refs = os.path.join(str(tmpdir), 'cache', 'cas', 'refs', 'heads', '@sources') + source_dir = os.path.join(str(tmpdir), 'cache', 'sources') + + cli.configure({ + 'cachedir': os.path.join(str(tmpdir), 'cache') + }) + + create_element_size('target.bst', project_dir, element_path, [], 10000) + + # get the source object + context = Context() + context.load() + project = Project(project_dir, context) + project.ensure_fully_loaded() + context.cachedir = cachedir + context.set_message_handler(dummy_message_handler) + + element = project.load_elements(["import-dev.bst"])[0] + source = list(element.sources())[0] + + # check consistency of the source + assert not element._source_cached() + + res = cli.run(project=project_dir, args=['build', 'target.bst']) + res.assert_success() + + # delete artifacts check state is buildable + cli.remove_artifact_from_cache(project_dir, 'target.bst') + states = cli.get_element_states(project_dir, ['target.bst']) + assert states['target.bst'] == 'buildable' + + # delete source dir and check that state is still buildable + shutil.rmtree(source_dir) + states = cli.get_element_states(project_dir, ['target.bst']) + assert states['target.bst'] == 'buildable' + + # build and check that no fetching was done. + res = cli.run(project=project_dir, args=['build', 'target.bst']) + res.assert_success() + assert 'Fetching from' not in res.stderr + + # assert the source directory is still empty (though there may be + # directories from staging etc.) + files = [] + for _, _, filename in os.walk(source_dir): + files.extend(filename) + assert files == [] + + # Now remove the source refs and check the state + shutil.rmtree(source_refs) + cli.remove_artifact_from_cache(project_dir, 'target.bst') + states = cli.get_element_states(project_dir, ['target.bst']) + assert states['target.bst'] == 'fetch needed' + + # Check that it now fetches from when building the target + res = cli.run(project=project_dir, args=['build', 'target.bst']) + res.assert_success() + assert 'Fetching from' in res.stderr diff --git a/tests/sourcecache/workspace.py b/tests/sourcecache/workspace.py new file mode 100644 index 000000000..440ca81b8 --- /dev/null +++ b/tests/sourcecache/workspace.py @@ -0,0 +1,59 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> +# +import os +import pytest +import shutil + +from buildstream.plugintestutils.runcli import cli + +from tests.testutils.element_generators import create_element_size + + +DATA_DIR = os.path.dirname(os.path.realpath(__file__)) + + +# Test that when we have sources only in the local CAS buildstream fetches them +# for opening a workspace +@pytest.mark.datafiles(DATA_DIR) +def test_workspace_source_fetch(tmpdir, datafiles, cli): + project_dir = os.path.join(str(tmpdir), 'project') + element_path = 'elements' + source_dir = os.path.join(str(tmpdir), 'cache', 'sources') + workspace = os.path.join(cli.directory, 'workspace') + + cli.configure({ + 'cachedir': os.path.join(str(tmpdir), 'cache') + }) + + create_element_size('target.bst', project_dir, element_path, [], 10000) + res = cli.run(project=project_dir, args=['build', 'target.bst']) + res.assert_success() + assert 'Fetching from' in res.stderr + + # remove the original sources + shutil.rmtree(source_dir) + + # Open a workspace and check that fetches the original sources + res = cli.run(project=project_dir, + args=['workspace', 'open', 'target.bst', '--directory', workspace]) + res.assert_success() + assert 'Fetching from' in res.stderr + + assert os.listdir(workspace) != [] |