#
# Copyright (C) 2016-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
#
# Authors:
# Tristan Van Berkom
# Tiago Gomes
import os
import sys
from collections import OrderedDict
from pathlib import Path
from pluginbase import PluginBase
from . import utils
from . import _site
from . import _yaml
from ._artifactelement import ArtifactElement
from ._profile import Topics, PROFILER
from ._exceptions import LoadError, LoadErrorReason
from ._options import OptionPool
from ._artifactcache import ArtifactCache
from ._sourcecache import SourceCache
from .node import ScalarNode, SequenceNode, _assert_symbol_name
from .sandbox import SandboxRemote
from ._elementfactory import ElementFactory
from ._sourcefactory import SourceFactory
from .types import CoreWarnings
from ._projectrefs import ProjectRefs, ProjectRefStorage
from ._versions import BST_FORMAT_VERSION
from ._versions import BST_FORMAT_VERSION_MIN
from ._loader import Loader
from .element import Element
from .types import FastEnum
from ._message import Message, MessageType
from ._includes import Includes
from ._workspaces import WORKSPACE_PROJECT_FILE
# Project Configuration file
_PROJECT_CONF_FILE = 'project.conf'
# List of all places plugins can come from
class PluginOrigins(FastEnum):
CORE = "core"
LOCAL = "local"
PIP = "pip"
# HostMount()
#
# A simple object describing the behavior of
# a host mount.
#
class HostMount():
def __init__(self, path, host_path=None, optional=False):
# Support environment variable expansion in host mounts
path = os.path.expandvars(path)
if host_path is not None:
host_path = os.path.expandvars(host_path)
self.path = path # Path inside the sandbox
self.host_path = host_path # Path on the host
self.optional = optional # Optional mounts do not incur warnings or errors
if self.host_path is None:
self.host_path = self.path
# Represents project configuration that can have different values for junctions.
class ProjectConfig:
def __init__(self):
self.element_factory = None
self.source_factory = None
self.options = None # OptionPool
self.base_variables = {} # The base set of variables
self.element_overrides = {} # Element specific configurations
self.source_overrides = {} # Source specific configurations
self.mirrors = OrderedDict() # contains dicts of alias-mappings to URIs.
self.default_mirror = None # The name of the preferred mirror.
self._aliases = None # Aliases dictionary
# Project()
#
# The Project Configuration
#
class Project():
def __init__(self, directory, context, *, junction=None, cli_options=None,
default_mirror=None, parent_loader=None,
search_for_project=True, fetch_subprojects=None):
# The project name
self.name = None
self._context = context # The invocation Context, a private member
if search_for_project:
self.directory, self._invoked_from_workspace_element = self._find_project_dir(directory)
else:
self.directory = directory
self._invoked_from_workspace_element = None
self._absolute_directory_path = Path(self.directory).resolve()
# Absolute path to where elements are loaded from within the project
self.element_path = None
# Default target elements
self._default_targets = None
# ProjectRefs for the main refs and also for junctions
self.refs = ProjectRefs(self.directory, 'project.refs')
self.junction_refs = ProjectRefs(self.directory, 'junction.refs')
self.config = ProjectConfig()
self.first_pass_config = ProjectConfig()
self.junction = junction # The junction Element object, if this is a subproject
self.ref_storage = None # ProjectRefStorage setting
self.base_environment = {} # The base set of environment variables
self.base_env_nocache = None # The base nocache mask (list) for the environment
#
# Private Members
#
self._default_mirror = default_mirror # The name of the preferred mirror.
self._cli_options = cli_options
self._fatal_warnings = [] # A list of warnings which should trigger an error
self._shell_command = [] # The default interactive shell command
self._shell_environment = {} # Statically set environment vars
self._shell_host_files = [] # A list of HostMount objects
self.artifact_cache_specs = None
self.source_cache_specs = None
self.remote_execution_specs = None
self._sandbox = None
self._splits = None
self._context.add_project(self)
self._partially_loaded = False
self._fully_loaded = False
self._project_includes = None
with PROFILER.profile(Topics.LOAD_PROJECT, self.directory.replace(os.sep, '-')):
self._load(parent_loader=parent_loader, fetch_subprojects=fetch_subprojects)
self._partially_loaded = True
@property
def options(self):
return self.config.options
@property
def base_variables(self):
return self.config.base_variables
@property
def element_overrides(self):
return self.config.element_overrides
@property
def source_overrides(self):
return self.config.source_overrides
# translate_url():
#
# Translates the given url which may be specified with an alias
# into a fully qualified url.
#
# Args:
# url (str): A url, which may be using an alias
# first_pass (bool): Whether to use first pass configuration (for junctions)
#
# Returns:
# str: The fully qualified url, with aliases resolved
#
# This method is provided for :class:`.Source` objects to resolve
# fully qualified urls based on the shorthand which is allowed
# to be specified in the YAML
def translate_url(self, url, *, first_pass=False):
if first_pass:
config = self.first_pass_config
else:
config = self.config
if url and utils._ALIAS_SEPARATOR in url:
url_alias, url_body = url.split(utils._ALIAS_SEPARATOR, 1)
alias_url = config._aliases.get_str(url_alias, default=None)
if alias_url:
url = alias_url + url_body
return url
# get_shell_config()
#
# Gets the project specified shell configuration
#
# Returns:
# (list): The shell command
# (dict): The shell environment
# (list): The list of HostMount objects
#
def get_shell_config(self):
return (self._shell_command, self._shell_environment, self._shell_host_files)
# get_path_from_node()
#
# Fetches the project path from a dictionary node and validates it
#
# Paths are asserted to never lead to a directory outside of the project
# directory. In addition, paths can not point to symbolic links, fifos,
# sockets and block/character devices.
#
# The `check_is_file` and `check_is_dir` parameters can be used to
# perform additional validations on the path. Note that an exception
# will always be raised if both parameters are set to ``True``.
#
# Args:
# node (ScalarNode): A Node loaded from YAML containing the path to validate
# check_is_file (bool): If ``True`` an error will also be raised
# if path does not point to a regular file.
# Defaults to ``False``
# check_is_dir (bool): If ``True`` an error will be also raised
# if path does not point to a directory.
# Defaults to ``False``
# Returns:
# (str): The project path
#
# Raises:
# (LoadError): In case that the project path is not valid or does not
# exist
#
def get_path_from_node(self, node, *,
check_is_file=False, check_is_dir=False):
path_str = node.as_str()
path = Path(path_str)
full_path = self._absolute_directory_path / path
if full_path.is_symlink():
provenance = node.get_provenance()
raise LoadError("{}: Specified path '{}' must not point to "
"symbolic links ".format(provenance, path_str),
LoadErrorReason.PROJ_PATH_INVALID_KIND)
if path.parts and path.parts[0] == '..':
provenance = node.get_provenance()
raise LoadError("{}: Specified path '{}' first component must "
"not be '..'".format(provenance, path_str),
LoadErrorReason.PROJ_PATH_INVALID)
try:
if sys.version_info[0] == 3 and sys.version_info[1] < 6:
full_resolved_path = full_path.resolve()
else:
full_resolved_path = full_path.resolve(strict=True) # pylint: disable=unexpected-keyword-arg
except FileNotFoundError:
provenance = node.get_provenance()
raise LoadError("{}: Specified path '{}' does not exist".format(provenance, path_str),
LoadErrorReason.MISSING_FILE)
is_inside = self._absolute_directory_path in full_resolved_path.parents or (
full_resolved_path == self._absolute_directory_path)
if not is_inside:
provenance = node.get_provenance()
raise LoadError("{}: Specified path '{}' must not lead outside of the "
"project directory".format(provenance, path_str),
LoadErrorReason.PROJ_PATH_INVALID)
if path.is_absolute():
provenance = node.get_provenance()
raise LoadError("{}: Absolute path: '{}' invalid.\n"
"Please specify a path relative to the project's root."
.format(provenance, path), LoadErrorReason.PROJ_PATH_INVALID)
if full_resolved_path.is_socket() or (
full_resolved_path.is_fifo() or
full_resolved_path.is_block_device()):
provenance = node.get_provenance()
raise LoadError("{}: Specified path '{}' points to an unsupported "
"file kind".format(provenance, path_str), LoadErrorReason.PROJ_PATH_INVALID_KIND)
if check_is_file and not full_resolved_path.is_file():
provenance = node.get_provenance()
raise LoadError("{}: Specified path '{}' is not a regular file"
.format(provenance, path_str), LoadErrorReason.PROJ_PATH_INVALID_KIND)
if check_is_dir and not full_resolved_path.is_dir():
provenance = node.get_provenance()
raise LoadError("{}: Specified path '{}' is not a directory"
.format(provenance, path_str), LoadErrorReason.PROJ_PATH_INVALID_KIND)
return path_str
def _validate_node(self, node):
node.validate_keys([
'format-version',
'element-path', 'variables',
'environment', 'environment-nocache',
'split-rules', 'elements', 'plugins',
'aliases', 'name', 'defaults',
'artifacts', 'options',
'fail-on-overlap', 'shell', 'fatal-warnings',
'ref-storage', 'sandbox', 'mirrors', 'remote-execution',
'sources', 'source-caches', '(@)'
])
# create_element()
#
# Instantiate and return an element
#
# Args:
# meta (MetaElement): The loaded MetaElement
# first_pass (bool): Whether to use first pass configuration (for junctions)
#
# Returns:
# (Element): A newly created Element object of the appropriate kind
#
def create_element(self, meta, *, first_pass=False):
if first_pass:
return self.first_pass_config.element_factory.create(self._context, self, meta)
else:
return self.config.element_factory.create(self._context, self, meta)
# create_artifact_element()
#
# Instantiate and return an ArtifactElement
#
# Args:
# ref (str): A string of the artifact ref
#
# Returns:
# (ArtifactElement): A newly created ArtifactElement object of the appropriate kind
#
def create_artifact_element(self, ref):
return ArtifactElement(self._context, ref)
# create_source()
#
# Instantiate and return a Source
#
# Args:
# meta (MetaSource): The loaded MetaSource
# first_pass (bool): Whether to use first pass configuration (for junctions)
#
# Returns:
# (Source): A newly created Source object of the appropriate kind
#
def create_source(self, meta, *, first_pass=False):
if first_pass:
return self.first_pass_config.source_factory.create(self._context, self, meta)
else:
return self.config.source_factory.create(self._context, self, meta)
# get_alias_uri()
#
# Returns the URI for a given alias, if it exists
#
# Args:
# alias (str): The alias.
# first_pass (bool): Whether to use first pass configuration (for junctions)
#
# Returns:
# str: The URI for the given alias; or None: if there is no URI for
# that alias.
def get_alias_uri(self, alias, *, first_pass=False):
if first_pass:
config = self.first_pass_config
else:
config = self.config
return config._aliases.get_str(alias, default=None)
# get_alias_uris()
#
# Args:
# alias (str): The alias.
# first_pass (bool): Whether to use first pass configuration (for junctions)
#
# Returns a list of every URI to replace an alias with
def get_alias_uris(self, alias, *, first_pass=False):
if first_pass:
config = self.first_pass_config
else:
config = self.config
if not alias or alias not in config._aliases: # pylint: disable=unsupported-membership-test
return [None]
mirror_list = []
for key, alias_mapping in config.mirrors.items():
if alias in alias_mapping:
if key == config.default_mirror:
mirror_list = alias_mapping[alias] + mirror_list
else:
mirror_list += alias_mapping[alias]
mirror_list.append(config._aliases.get_str(alias))
return mirror_list
# load_elements()
#
# Loads elements from target names.
#
# Args:
# targets (list): Target names
# rewritable (bool): Whether the loaded files should be rewritable
# this is a bit more expensive due to deep copies
# ignore_workspaces (bool): Whether to load workspace sources
#
# Returns:
# (list): A list of loaded Element
#
def load_elements(self, targets, *, rewritable=False, ignore_workspaces=False):
with self._context.messenger.simple_task("Loading elements", silent_nested=True) as task:
meta_elements = self.loader.load(targets, task, rewritable=rewritable, ticker=None,
ignore_workspaces=ignore_workspaces)
with self._context.messenger.simple_task("Resolving elements") as task:
if task:
task.set_maximum_progress(self.loader.loaded)
elements = [
Element._new_from_meta(meta, task)
for meta in meta_elements
]
Element._clear_meta_elements_cache()
# Now warn about any redundant source references which may have
# been discovered in the resolve() phase.
redundant_refs = Element._get_redundant_source_refs()
if redundant_refs:
detail = "The following inline specified source references will be ignored:\n\n"
lines = [
"{}:{}".format(source._get_provenance(), ref)
for source, ref in redundant_refs
]
detail += "\n".join(lines)
self._context.messenger.message(
Message(MessageType.WARN, "Ignoring redundant source references", detail=detail))
return elements
# load_artifacts()
#
# Loads artifacts from target artifact refs
#
# Args:
# targets (list): Target artifact refs
#
# Returns:
# (list): A list of loaded ArtifactElement
#
def load_artifacts(self, targets):
with self._context.messenger.simple_task("Loading artifacts") as task:
# XXX: Here, we are explicitly checking for refs in the artifactdir
# for two reasons:
# 1. The Project, or the Context, do not currently have
# access to the ArtifactCache
# 2. The ArtifactCache.contains() method expects an Element
# and a key, not a ref.
#
artifacts = []
for ref in targets:
artifacts.append(ArtifactElement._new_from_artifact_ref(ref, self._context, task))
ArtifactElement._clear_artifact_refs_cache()
return artifacts
# ensure_fully_loaded()
#
# Ensure project has finished loading. At first initialization, a
# project can only load junction elements. Other elements require
# project to be fully loaded.
#
def ensure_fully_loaded(self):
if self._fully_loaded:
return
assert self._partially_loaded
self._fully_loaded = True
if self.junction:
self.junction._get_project().ensure_fully_loaded()
self._load_second_pass()
# invoked_from_workspace_element()
#
# Returns the element whose workspace was used to invoke buildstream
# if buildstream was invoked from an external workspace
#
def invoked_from_workspace_element(self):
return self._invoked_from_workspace_element
# cleanup()
#
# Cleans up resources used loading elements
#
def cleanup(self):
# Reset the element loader state
Element._reset_load_state()
# get_default_target()
#
# Attempts to interpret which element the user intended to run a command on.
# This is for commands that only accept a single target element and thus,
# this only uses the workspace element (if invoked from workspace directory)
# and does not use the project default targets.
#
def get_default_target(self):
return self._invoked_from_workspace_element
# get_default_targets()
#
# Attempts to interpret which elements the user intended to run a command on.
# This is for commands that accept multiple target elements.
#
def get_default_targets(self):
# If _invoked_from_workspace_element has a value,
# a workspace element was found before a project config
# Therefore the workspace does not contain a project
if self._invoked_from_workspace_element:
return (self._invoked_from_workspace_element,)
# Default targets from project configuration
if self._default_targets:
return tuple(self._default_targets)
# If default targets are not configured, default to all project elements
default_targets = []
for root, dirs, files in os.walk(self.element_path):
# Do not recurse down the ".bst" directory which is where we stage
# junctions and other BuildStream internals.
if ".bst" in dirs:
dirs.remove(".bst")
for file in files:
if file.endswith(".bst"):
rel_dir = os.path.relpath(root, self.element_path)
rel_file = os.path.join(rel_dir, file).lstrip("./")
default_targets.append(rel_file)
return tuple(default_targets)
# _load():
#
# Loads the project configuration file in the project
# directory process the first pass.
#
# Raises: LoadError if there was a problem with the project.conf
#
def _load(self, *, parent_loader=None, fetch_subprojects):
# Load builtin default
projectfile = os.path.join(self.directory, _PROJECT_CONF_FILE)
self._default_config_node = _yaml.load(_site.default_project_config)
# Load project local config and override the builtin
try:
self._project_conf = _yaml.load(projectfile)
except LoadError as e:
# Raise a more specific error here
if e.reason == LoadErrorReason.MISSING_FILE:
raise LoadError(str(e), LoadErrorReason.MISSING_PROJECT_CONF) from e
# Otherwise re-raise the original exception
raise
pre_config_node = self._default_config_node.clone()
self._project_conf._composite(pre_config_node)
# Assert project's format version early, before validating toplevel keys
format_version = pre_config_node.get_int('format-version')
if format_version < BST_FORMAT_VERSION_MIN:
major, minor = utils.get_bst_version()
raise LoadError(
"Project requested format version {}, but BuildStream {}.{} only supports format version {} or above."
"Use latest 1.x release"
.format(format_version, major, minor, BST_FORMAT_VERSION_MIN), LoadErrorReason.UNSUPPORTED_PROJECT)
if BST_FORMAT_VERSION < format_version:
major, minor = utils.get_bst_version()
raise LoadError(
"Project requested format version {}, but BuildStream {}.{} only supports up until format version {}"
.format(format_version, major, minor, BST_FORMAT_VERSION), LoadErrorReason.UNSUPPORTED_PROJECT)
self._validate_node(pre_config_node)
# The project name, element path and option declarations
# are constant and cannot be overridden by option conditional statements
# FIXME: we should be keeping node information for further composition here
self.name = self._project_conf.get_str('name')
# Validate that project name is a valid symbol name
_assert_symbol_name(self.name, "project name",
ref_node=pre_config_node.get_node('name'))
self.element_path = os.path.join(
self.directory,
self.get_path_from_node(pre_config_node.get_scalar('element-path'),
check_is_dir=True)
)
self.config.options = OptionPool(self.element_path)
self.first_pass_config.options = OptionPool(self.element_path)
defaults = pre_config_node.get_mapping('defaults')
defaults.validate_keys(['targets'])
self._default_targets = defaults.get_str_list("targets")
# Fatal warnings
self._fatal_warnings = pre_config_node.get_str_list('fatal-warnings', default=[])
self.loader = Loader(self._context, self,
parent=parent_loader, fetch_subprojects=fetch_subprojects)
self._project_includes = Includes(self.loader, copy_tree=False)
project_conf_first_pass = self._project_conf.clone()
self._project_includes.process(project_conf_first_pass, only_local=True)
config_no_include = self._default_config_node.clone()
project_conf_first_pass._composite(config_no_include)
self._load_pass(config_no_include, self.first_pass_config,
ignore_unknown=True)
# Use separate file for storing source references
ref_storage_node = pre_config_node.get_scalar('ref-storage')
self.ref_storage = ref_storage_node.as_str()
if self.ref_storage not in [ProjectRefStorage.INLINE, ProjectRefStorage.PROJECT_REFS]:
p = ref_storage_node.get_provenance()
raise LoadError("{}: Invalid value '{}' specified for ref-storage"
.format(p, self.ref_storage), LoadErrorReason.INVALID_DATA)
if self.ref_storage == ProjectRefStorage.PROJECT_REFS:
self.junction_refs.load(self.first_pass_config.options)
# _load_second_pass()
#
# Process the second pass of loading the project configuration.
#
def _load_second_pass(self):
project_conf_second_pass = self._project_conf.clone()
self._project_includes.process(project_conf_second_pass)
config = self._default_config_node.clone()
project_conf_second_pass._composite(config)
self._load_pass(config, self.config)
self._validate_node(config)
#
# Now all YAML composition is done, from here on we just load
# the values from our loaded configuration dictionary.
#
# Load artifacts pull/push configuration for this project
self.artifact_cache_specs = ArtifactCache.specs_from_config_node(config, self.directory)
# If there is a junction Element which specifies that we want to remotely cache
# its elements, append the junction's remotes to the artifact cache specs list
if self.junction:
parent = self.junction._get_project()
if self.junction.cache_junction_elements:
self.artifact_cache_specs = parent.artifact_cache_specs + self.artifact_cache_specs
if self.junction.ignore_junction_remotes:
self.artifact_cache_specs = []
# Load source caches with pull/push config
self.source_cache_specs = SourceCache.specs_from_config_node(config, self.directory)
# Load remote-execution configuration for this project
project_specs = SandboxRemote.specs_from_config_node(config, self.directory)
override_specs = SandboxRemote.specs_from_config_node(
self._context.get_overrides(self.name), self.directory)
if override_specs is not None:
self.remote_execution_specs = override_specs
elif project_specs is not None:
self.remote_execution_specs = project_specs
else:
self.remote_execution_specs = self._context.remote_execution_specs
# Load sandbox environment variables
self.base_environment = config.get_mapping('environment')
self.base_env_nocache = config.get_str_list('environment-nocache')
# Load sandbox configuration
self._sandbox = config.get_mapping('sandbox')
# Load project split rules
self._splits = config.get_mapping('split-rules')
# Support backwards compatibility for fail-on-overlap
fail_on_overlap = config.get_scalar('fail-on-overlap', None)
# Deprecation check
if not fail_on_overlap.is_none():
self._context.messenger.message(
Message(
MessageType.WARN,
"Use of fail-on-overlap within project.conf " +
"is deprecated. Consider using fatal-warnings instead."
)
)
if (CoreWarnings.OVERLAPS not in self._fatal_warnings) and fail_on_overlap.as_bool():
self._fatal_warnings.append(CoreWarnings.OVERLAPS)
# Load project.refs if it exists, this may be ignored.
if self.ref_storage == ProjectRefStorage.PROJECT_REFS:
self.refs.load(self.options)
# Parse shell options
shell_options = config.get_mapping('shell')
shell_options.validate_keys(['command', 'environment', 'host-files'])
self._shell_command = shell_options.get_str_list('command')
# Perform environment expansion right away
shell_environment = shell_options.get_mapping('environment', default={})
for key in shell_environment.keys():
value = shell_environment.get_str(key)
self._shell_environment[key] = os.path.expandvars(value)
# Host files is parsed as a list for convenience
host_files = shell_options.get_sequence('host-files', default=[])
for host_file in host_files:
if isinstance(host_file, ScalarNode):
mount = HostMount(host_file)
else:
# Some validation
host_file.validate_keys(['path', 'host_path', 'optional'])
# Parse the host mount
path = host_file.get_str('path')
host_path = host_file.get_str('host_path', default=None)
optional = host_file.get_bool('optional', default=False)
mount = HostMount(path, host_path, optional)
self._shell_host_files.append(mount)
# _load_pass():
#
# Loads parts of the project configuration that are different
# for first and second pass configurations.
#
# Args:
# config (dict) - YaML node of the configuration file.
# output (ProjectConfig) - ProjectConfig to load configuration onto.
# ignore_unknown (bool) - Whether option loader shoud ignore unknown options.
#
def _load_pass(self, config, output, *,
ignore_unknown=False):
# Element and Source type configurations will be composited later onto
# element/source types, so we delete it from here and run our final
# assertion after.
output.element_overrides = config.get_mapping('elements', default={})
output.source_overrides = config.get_mapping('sources', default={})
config.safe_del('elements')
config.safe_del('sources')
config._assert_fully_composited()
self._load_plugin_factories(config, output)
# Load project options
options_node = config.get_mapping('options', default={})
output.options.load(options_node)
if self.junction:
# load before user configuration
output.options.load_yaml_values(self.junction.options, transform=self.junction.node_subst_vars)
# Collect option values specified in the user configuration
overrides = self._context.get_overrides(self.name)
override_options = overrides.get_mapping('options', default={})
output.options.load_yaml_values(override_options)
if self._cli_options:
output.options.load_cli_values(self._cli_options, ignore_unknown=ignore_unknown)
# We're done modifying options, now we can use them for substitutions
output.options.resolve()
#
# Now resolve any conditionals in the remaining configuration,
# any conditionals specified for project option declarations,
# or conditionally specifying the project name; will be ignored.
#
# Don't forget to also resolve options in the element and source overrides.
output.options.process_node(config)
output.options.process_node(output.element_overrides)
output.options.process_node(output.source_overrides)
# Load base variables
output.base_variables = config.get_mapping('variables')
# Add the project name as a default variable
output.base_variables['project-name'] = self.name
# Extend variables with automatic variables and option exports
# Initialize it as a string as all variables are processed as strings.
# Based on some testing (mainly on AWS), maximum effective
# max-jobs value seems to be around 8-10 if we have enough cores
# users should set values based on workload and build infrastructure
if self._context.build_max_jobs == 0:
# User requested automatic max-jobs
platform = self._context.platform
output.base_variables['max-jobs'] = str(platform.get_cpu_count(8))
else:
# User requested explicit max-jobs setting
output.base_variables['max-jobs'] = str(self._context.build_max_jobs)
# Export options into variables, if that was requested
output.options.export_variables(output.base_variables)
# Override default_mirror if not set by command-line
output.default_mirror = self._default_mirror or overrides.get_str(
'default-mirror', default=None)
mirrors = config.get_sequence('mirrors', default=[])
for mirror in mirrors:
allowed_mirror_fields = [
'name', 'aliases'
]
mirror.validate_keys(allowed_mirror_fields)
mirror_name = mirror.get_str('name')
alias_mappings = {}
for alias_mapping, uris in mirror.get_mapping('aliases').items():
assert type(uris) is SequenceNode # pylint: disable=unidiomatic-typecheck
alias_mappings[alias_mapping] = uris.as_str_list()
output.mirrors[mirror_name] = alias_mappings
if not output.default_mirror:
output.default_mirror = mirror_name
# Source url aliases
output._aliases = config.get_mapping('aliases', default={})
# _find_project_dir()
#
# Returns path of the project directory, if a configuration file is found
# in given directory or any of its parent directories.
#
# Args:
# directory (str) - directory from where the command was invoked
#
# Raises:
# LoadError if project.conf is not found
#
# Returns:
# (str) - the directory that contains the project, and
# (str) - the name of the element required to find the project, or None
#
def _find_project_dir(self, directory):
workspace_element = None
config_filenames = [_PROJECT_CONF_FILE, WORKSPACE_PROJECT_FILE]
found_directory, filename = utils._search_upward_for_files(
directory, config_filenames
)
if filename == _PROJECT_CONF_FILE:
project_directory = found_directory
elif filename == WORKSPACE_PROJECT_FILE:
workspace_project_cache = self._context.get_workspace_project_cache()
workspace_project = workspace_project_cache.get(found_directory)
if workspace_project:
project_directory = workspace_project.get_default_project_path()
workspace_element = workspace_project.get_default_element()
else:
raise LoadError("None of {names} found in '{path}' or any of its parent directories"
.format(names=config_filenames, path=directory), LoadErrorReason.MISSING_PROJECT_CONF)
return project_directory, workspace_element
def _load_plugin_factories(self, config, output):
plugin_source_origins = [] # Origins of custom sources
plugin_element_origins = [] # Origins of custom elements
# Plugin origins and versions
origins = config.get_sequence('plugins', default=[])
source_format_versions = {}
element_format_versions = {}
for origin in origins:
allowed_origin_fields = [
'origin', 'sources', 'elements',
'package-name', 'path',
]
origin.validate_keys(allowed_origin_fields)
# Store source versions for checking later
source_versions = origin.get_mapping('sources', default={})
for key in source_versions.keys():
if key in source_format_versions:
raise LoadError("Duplicate listing of source '{}'".format(key),
LoadErrorReason.INVALID_YAML)
source_format_versions[key] = source_versions.get_int(key)
# Store element versions for checking later
element_versions = origin.get_mapping('elements', default={})
for key in element_versions.keys():
if key in element_format_versions:
raise LoadError("Duplicate listing of element '{}'".format(key),
LoadErrorReason.INVALID_YAML)
element_format_versions[key] = element_versions.get_int(key)
# Store the origins if they're not 'core'.
# core elements are loaded by default, so storing is unnecessary.
origin_value = origin.get_enum('origin', PluginOrigins)
if origin_value != PluginOrigins.CORE:
self._store_origin(origin, 'sources', plugin_source_origins)
self._store_origin(origin, 'elements', plugin_element_origins)
pluginbase = PluginBase(package='buildstream.plugins')
output.element_factory = ElementFactory(pluginbase,
plugin_origins=plugin_element_origins,
format_versions=element_format_versions)
output.source_factory = SourceFactory(pluginbase,
plugin_origins=plugin_source_origins,
format_versions=source_format_versions)
# _store_origin()
#
# Helper function to store plugin origins
#
# Args:
# origin (node) - a node indicating the origin of a group of
# plugins.
# plugin_group (str) - The name of the type of plugin that is being
# loaded
# destination (list) - A list of nodes to store the origins in
#
# Raises:
# LoadError if 'origin' is an unexpected value
def _store_origin(self, origin, plugin_group, destination):
expected_groups = ['sources', 'elements']
if plugin_group not in expected_groups:
raise LoadError("Unexpected plugin group: {}, expecting {}"
.format(plugin_group, expected_groups),
LoadErrorReason.INVALID_DATA)
if plugin_group in origin.keys():
origin_node = origin.clone()
plugins = origin.get_mapping(plugin_group, default={})
origin_node['plugins'] = plugins.keys()
for group in expected_groups:
if group in origin_node:
del origin_node[group]
if origin_node.get_enum('origin', PluginOrigins) == PluginOrigins.LOCAL:
path = self.get_path_from_node(origin.get_scalar('path'),
check_is_dir=True)
# paths are passed in relative to the project, but must be absolute
origin_node['path'] = os.path.join(self.directory, path)
destination.append(origin_node)
# _warning_is_fatal():
#
# Returns true if the warning in question should be considered fatal based on
# the project configuration.
#
# Args:
# warning_str (str): The warning configuration string to check against
#
# Returns:
# (bool): True if the warning should be considered fatal and cause an error.
#
def _warning_is_fatal(self, warning_str):
return warning_str in self._fatal_warnings