# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
#
# Authors:
# Raoul Hidalgo Charman
#
import multiprocessing
import os
from fnmatch import fnmatch
from . import utils
from . import _yaml
from ._cas import CASRemote
from ._message import Message, MessageType
from ._exceptions import LoadError
# Base Cache for Caches to derive from
#
class BaseCache():
# None of these should ever be called in the base class, but this appeases
# pylint to some degree
spec_class = None
spec_name = None
spec_error = None
config_node_name = None
remote_class = CASRemote
def __init__(self, context):
self.context = context
self.cas = context.get_cascache()
self.casquota = context.get_casquota()
self.casquota._calculate_cache_quota()
self._remotes_setup = False # Check to prevent double-setup of remotes
# Per-project list of _CASRemote instances.
self._remotes = {}
self.global_remote_specs = []
self.project_remote_specs = {}
self._has_fetch_remotes = False
self._has_push_remotes = False
# specs_from_config_node()
#
# Parses the configuration of remote artifact caches from a config block.
#
# Args:
# config_node (dict): The config block, which may contain the 'artifacts' key
# basedir (str): The base directory for relative paths
#
# Returns:
# A list of ArtifactCacheSpec instances.
#
# Raises:
# LoadError, if the config block contains invalid keys.
#
@classmethod
def specs_from_config_node(cls, config_node, basedir=None):
cache_specs = []
try:
artifacts = [_yaml.node_get(config_node, dict, cls.config_node_name)]
except LoadError:
try:
artifacts = _yaml.node_get(config_node, list, cls.config_node_name, default_value=[])
except LoadError:
provenance = _yaml.node_get_provenance(config_node, key=cls.config_node_name)
raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA,
"%s: 'artifacts' must be a single 'url:' mapping, or a list of mappings" %
(str(provenance)))
for spec_node in artifacts:
cache_specs.append(cls.spec_class._new_from_config_node(spec_node, basedir))
return cache_specs
# _configured_remote_cache_specs():
#
# Return the list of configured remotes for a given project, in priority
# order. This takes into account the user and project configuration.
#
# Args:
# context (Context): The BuildStream context
# project (Project): The BuildStream project
#
# Returns:
# A list of ArtifactCacheSpec instances describing the remote artifact caches.
#
@classmethod
def _configured_remote_cache_specs(cls, context, project):
project_specs = getattr(project, cls.spec_name)
context_specs = getattr(context, cls.spec_name)
return list(utils._deduplicate(project_specs + context_specs))
# setup_remotes():
#
# Sets up which remotes to use
#
# Args:
# use_config (bool): Whether to use project configuration
# remote_url (str): Remote cache URL
#
# This requires that all of the projects which are to be processed in the session
# have already been loaded and are observable in the Context.
#
def setup_remotes(self, *, use_config=False, remote_url=None):
# Ensure we do not double-initialise since this can be expensive
assert not self._remotes_setup
self._remotes_setup = True
# Initialize remote caches. We allow the commandline to override
# the user config in some cases (for example `bst artifact push --remote=...`).
has_remote_caches = False
if remote_url:
# pylint: disable=not-callable
self._set_remotes([self.spec_class(remote_url, push=True)])
has_remote_caches = True
if use_config:
for project in self.context.get_projects():
caches = self._configured_remote_cache_specs(self.context, project)
if caches: # caches is a list of spec_class instances
self._set_remotes(caches, project=project)
has_remote_caches = True
if has_remote_caches:
self._initialize_remotes()
# initialize_remotes():
#
# This will contact each remote cache.
#
# Args:
# on_failure (callable): Called if we fail to contact one of the caches.
#
def initialize_remotes(self, *, on_failure=None):
remote_specs = self.global_remote_specs.copy()
for project in self.project_remote_specs:
remote_specs.extend(self.project_remote_specs[project])
remote_specs = list(utils._deduplicate(remote_specs))
remotes = {}
q = multiprocessing.Queue()
for remote_spec in remote_specs:
error = self.remote_class.check_remote(remote_spec, q)
if error and on_failure:
on_failure(remote_spec.url, error)
continue
elif error:
raise self.spec_error(error) # pylint: disable=not-callable
self._has_fetch_remotes = True
if remote_spec.push:
self._has_push_remotes = True
remotes[remote_spec.url] = self.remote_class(remote_spec)
for project in self.context.get_projects():
remote_specs = self.global_remote_specs
if project in self.project_remote_specs:
remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project]))
project_remotes = []
for remote_spec in remote_specs:
# Errors are already handled in the loop above,
# skip unreachable remotes here.
if remote_spec.url not in remotes:
continue
remote = remotes[remote_spec.url]
project_remotes.append(remote)
self._remotes[project] = project_remotes
# has_fetch_remotes():
#
# Check whether any remote repositories are available for fetching.
#
# Args:
# plugin (Plugin): The Plugin to check
#
# Returns: True if any remote repositories are configured, False otherwise
#
def has_fetch_remotes(self, *, plugin=None):
if not self._has_fetch_remotes:
# No project has fetch remotes
return False
elif plugin is None:
# At least one (sub)project has fetch remotes
return True
else:
# Check whether the specified element's project has fetch remotes
remotes_for_project = self._remotes[plugin._get_project()]
return bool(remotes_for_project)
# has_push_remotes():
#
# Check whether any remote repositories are available for pushing.
#
# Args:
# element (Element): The Element to check
#
# Returns: True if any remote repository is configured, False otherwise
#
def has_push_remotes(self, *, plugin=None):
if not self._has_push_remotes:
# No project has push remotes
return False
elif plugin is None:
# At least one (sub)project has push remotes
return True
else:
# Check whether the specified element's project has push remotes
remotes_for_project = self._remotes[plugin._get_project()]
return any(remote.spec.push for remote in remotes_for_project)
################################################
# Local Private Methods #
################################################
# _message()
#
# Local message propagator
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
self.context.message(
Message(None, message_type, message, **args))
# _set_remotes():
#
# Set the list of remote caches. If project is None, the global list of
# remote caches will be set, which is used by all projects. If a project is
# specified, the per-project list of remote caches will be set.
#
# Args:
# remote_specs (list): List of ArtifactCacheSpec instances, in priority order.
# project (Project): The Project instance for project-specific remotes
def _set_remotes(self, remote_specs, *, project=None):
if project is None:
# global remotes
self.global_remote_specs = remote_specs
else:
self.project_remote_specs[project] = remote_specs
# _initialize_remotes()
#
# An internal wrapper which calls the abstract method and
# reports takes care of messaging
#
def _initialize_remotes(self):
def remote_failed(url, error):
self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
# _list_refs_mtimes()
#
# List refs in a directory, given a base path. Also returns the
# associated mtimes
#
# Args:
# base_path (str): Base path to traverse over
# glob_expr (str|None): Optional glob expression to match against files
#
# Returns:
# (iter (mtime, filename)]): iterator of tuples of mtime and refs
#
def _list_refs_mtimes(self, base_path, *, glob_expr=None):
path = base_path
if glob_expr is not None:
globdir = os.path.dirname(glob_expr)
if not any(c in "*?[" for c in globdir):
# path prefix contains no globbing characters so
# append the glob to optimise the os.walk()
path = os.path.join(base_path, globdir)
for root, _, files in os.walk(path):
for filename in files:
ref_path = os.path.join(root, filename)
relative_path = os.path.relpath(ref_path, base_path) # Relative to refs head
if not glob_expr or fnmatch(relative_path, glob_expr):
# Obtain the mtime (the time a file was last modified)
yield (os.path.getmtime(ref_path), relative_path)