#
# Copyright (C) 2018 Codethink Limited
# Copyright (C) 2019 Bloomberg LLP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
#
# Authors:
# Tristan Van Berkom
# Daniel Silverstone
# James Ennis
import sys
import string
from contextlib import ExitStack
from collections import OrderedDict, namedtuple
from collections.abc import Mapping, Sequence
from copy import deepcopy
from itertools import count
from ruamel import yaml
from ._exceptions import LoadError, LoadErrorReason
# Without this, pylint complains about all the `type(foo) is blah` checks
# because it feels isinstance() is more idiomatic. Sadly, it is much slower to
# do `isinstance(foo, blah)` for reasons I am unable to fathom. As such, we
# blanket disable the check for this module.
#
# pylint: disable=unidiomatic-typecheck
# Node()
#
# Container for YAML loaded data and its provenance
#
# All nodes returned (and all internal lists/strings) have this type (rather
# than a plain tuple, to distinguish them in things like node_sanitize)
#
# Members:
# value (str/list/dict): The loaded value.
# file_index (int): Index within _FILE_LIST (a list of loaded file paths).
# Negative indices indicate synthetic nodes so that
# they can be referenced.
# line (int): The line number within the file where the value appears.
# col (int): The column number within the file where the value appears.
#
# For efficiency, each field should be accessed by its integer index:
# value = Node[0]
# file_index = Node[1]
# line = Node[2]
# column = Node[3]
#
class Node(namedtuple('Node', ['value', 'file_index', 'line', 'column'])):
def __contains__(self, what):
# Delegate to the inner value, though this will likely not work
# very well if the node is a list or string, it's unlikely that
# code which has access to such nodes would do this.
return what in self[0]
# File name handling
_FILE_LIST = []
# Purely synthetic node will have None for the file number, have line number
# zero, and a negative column number which comes from inverting the next value
# out of this counter. Synthetic nodes created with a reference node will
# have a file number from the reference node, some unknown line number, and
# a negative column number from this counter.
_SYNTHETIC_COUNTER = count(start=-1, step=-1)
# Returned from node_get_provenance
class ProvenanceInformation:
__slots__ = (
"filename",
"shortname",
"displayname",
"line",
"col",
"toplevel",
"node",
"project",
"is_synthetic",
)
def __init__(self, nodeish):
self.node = nodeish
if (nodeish is None) or (nodeish[1] is None):
self.filename = ""
self.shortname = ""
self.displayname = ""
self.line = 1
self.col = 0
self.toplevel = None
self.project = None
else:
fileinfo = _FILE_LIST[nodeish[1]]
self.filename = fileinfo[0]
self.shortname = fileinfo[1]
self.displayname = fileinfo[2]
# We add 1 here to convert from computerish to humanish
self.line = nodeish[2] + 1
self.col = nodeish[3]
self.toplevel = fileinfo[3]
self.project = fileinfo[4]
self.is_synthetic = (self.filename == '') or (self.col < 0)
# Convert a Provenance to a string for error reporting
def __str__(self):
if self.is_synthetic:
return "{} [synthetic node]".format(self.displayname)
else:
return "{} [line {:d} column {:d}]".format(self.displayname, self.line, self.col)
# These exceptions are intended to be caught entirely within
# the BuildStream framework, hence they do not reside in the
# public exceptions.py
class CompositeError(Exception):
def __init__(self, path, message):
super(CompositeError, self).__init__(message)
self.path = path
self.message = message
class YAMLLoadError(Exception):
pass
# Representer for YAML events comprising input to the BuildStream format.
#
# All streams MUST represent a single document which must be a Mapping.
# Anything else is considered an error.
#
# Mappings must only have string keys, values are always represented as
# strings if they are scalar, or else as simple dictionaries and lists.
#
class Representer:
__slots__ = (
"_file_index",
"state",
"output",
"keys",
)
# Initialise a new representer
#
# The file index is used to store into the Node instances so that the
# provenance of the YAML can be tracked.
#
# Args:
# file_index (int): The index of this YAML file
def __init__(self, file_index):
self._file_index = file_index
self.state = "init"
self.output = []
self.keys = []
# Handle a YAML parse event
#
# Args:
# event (YAML Event): The event to be handled
#
# Raises:
# YAMLLoadError: Something went wrong.
def handle_event(self, event):
if getattr(event, "anchor", None) is not None:
raise YAMLLoadError("Anchors are disallowed in BuildStream at line {} column {}"
.format(event.start_mark.line, event.start_mark.column))
if event.__class__.__name__ == "ScalarEvent":
if event.tag is not None:
if not event.tag.startswith("tag:yaml.org,2002:"):
raise YAMLLoadError(
"Non-core tag expressed in input. " +
"This is disallowed in BuildStream. At line {} column {}"
.format(event.start_mark.line, event.start_mark.column))
handler = "_handle_{}_{}".format(self.state, event.__class__.__name__)
handler = getattr(self, handler, None)
if handler is None:
raise YAMLLoadError(
"Invalid input detected. No handler for {} in state {} at line {} column {}"
.format(event, self.state, event.start_mark.line, event.start_mark.column))
self.state = handler(event) # pylint: disable=not-callable
# Get the output of the YAML parse
#
# Returns:
# (Node or None): Return the Node instance of the top level mapping or
# None if there wasn't one.
def get_output(self):
try:
return self.output[0]
except IndexError:
return None
def _handle_init_StreamStartEvent(self, ev):
return "stream"
def _handle_stream_DocumentStartEvent(self, ev):
return "doc"
def _handle_doc_MappingStartEvent(self, ev):
newmap = Node({}, self._file_index, ev.start_mark.line, ev.start_mark.column)
self.output.append(newmap)
return "wait_key"
def _handle_wait_key_ScalarEvent(self, ev):
self.keys.append(ev.value)
return "wait_value"
def _handle_wait_value_ScalarEvent(self, ev):
key = self.keys.pop()
self.output[-1][0][key] = \
Node(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column)
return "wait_key"
def _handle_wait_value_MappingStartEvent(self, ev):
new_state = self._handle_doc_MappingStartEvent(ev)
key = self.keys.pop()
self.output[-2][0][key] = self.output[-1]
return new_state
def _handle_wait_key_MappingEndEvent(self, ev):
# We've finished a mapping, so pop it off the output stack
# unless it's the last one in which case we leave it
if len(self.output) > 1:
self.output.pop()
if type(self.output[-1][0]) is list:
return "wait_list_item"
else:
return "wait_key"
else:
return "doc"
def _handle_wait_value_SequenceStartEvent(self, ev):
self.output.append(Node([], self._file_index, ev.start_mark.line, ev.start_mark.column))
self.output[-2][0][self.keys[-1]] = self.output[-1]
return "wait_list_item"
def _handle_wait_list_item_SequenceStartEvent(self, ev):
self.keys.append(len(self.output[-1][0]))
self.output.append(Node([], self._file_index, ev.start_mark.line, ev.start_mark.column))
self.output[-2][0].append(self.output[-1])
return "wait_list_item"
def _handle_wait_list_item_SequenceEndEvent(self, ev):
# When ending a sequence, we need to pop a key because we retain the
# key until the end so that if we need to mutate the underlying entry
# we can.
key = self.keys.pop()
self.output.pop()
if type(key) is int:
return "wait_list_item"
else:
return "wait_key"
def _handle_wait_list_item_ScalarEvent(self, ev):
self.output[-1][0].append(
Node(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column))
return "wait_list_item"
def _handle_wait_list_item_MappingStartEvent(self, ev):
new_state = self._handle_doc_MappingStartEvent(ev)
self.output[-2][0].append(self.output[-1])
return new_state
def _handle_doc_DocumentEndEvent(self, ev):
if len(self.output) != 1:
raise YAMLLoadError("Zero, or more than one document found in YAML stream")
return "stream"
def _handle_stream_StreamEndEvent(self, ev):
return "init"
# Loads a dictionary from some YAML
#
# Args:
# filename (str): The YAML file to load
# shortname (str): The filename in shorthand for error reporting (or None)
# copy_tree (bool): Whether to make a copy, preserving the original toplevels
# for later serialization
# project (Project): The (optional) project to associate the parsed YAML with
#
# Returns (dict): A loaded copy of the YAML file with provenance information
#
# Raises: LoadError
#
def load(filename, shortname=None, copy_tree=False, *, project=None):
if not shortname:
shortname = filename
if (project is not None) and (project.junction is not None):
displayname = "{}:{}".format(project.junction.name, shortname)
else:
displayname = shortname
file_number = len(_FILE_LIST)
_FILE_LIST.append((filename, shortname, displayname, None, project))
try:
with open(filename) as f:
contents = f.read()
data = load_data(contents,
file_index=file_number,
file_name=filename,
copy_tree=copy_tree)
return data
except FileNotFoundError as e:
raise LoadError(LoadErrorReason.MISSING_FILE,
"Could not find file at {}".format(filename)) from e
except IsADirectoryError as e:
raise LoadError(LoadErrorReason.LOADING_DIRECTORY,
"{} is a directory. bst command expects a .bst file."
.format(filename)) from e
except LoadError as e:
raise LoadError(e.reason, "{}: {}".format(displayname, e)) from e
# Like load(), but doesnt require the data to be in a file
#
def load_data(data, file_index=None, file_name=None, copy_tree=False):
try:
rep = Representer(file_index)
for event in yaml.parse(data, Loader=yaml.CBaseLoader):
rep.handle_event(event)
contents = rep.get_output()
except YAMLLoadError as e:
raise LoadError(LoadErrorReason.INVALID_YAML,
"Malformed YAML:\n\n{}\n\n".format(e)) from e
except Exception as e:
raise LoadError(LoadErrorReason.INVALID_YAML,
"Severely malformed YAML:\n\n{}\n\n".format(e)) from e
if not isinstance(contents, tuple) or not isinstance(contents[0], dict):
# Special case allowance for None, when the loaded file has only comments in it.
if contents is None:
contents = Node({}, file_index, 0, 0)
else:
raise LoadError(LoadErrorReason.INVALID_YAML,
"YAML file has content of type '{}' instead of expected type 'dict': {}"
.format(type(contents[0]).__name__, file_name))
# Store this away because we'll use it later for "top level" provenance
if file_index is not None:
_FILE_LIST[file_index] = (
_FILE_LIST[file_index][0], # Filename
_FILE_LIST[file_index][1], # Shortname
_FILE_LIST[file_index][2], # Displayname
contents,
_FILE_LIST[file_index][4], # Project
)
if copy_tree:
contents = node_copy(contents)
return contents
# dump()
#
# Write a YAML node structure out to disk.
#
# This will always call `node_sanitize` on its input, so if you wanted
# to output something close to what you read in, consider using the
# `roundtrip_load` and `roundtrip_dump` function pair instead.
#
# Args:
# contents (any): Content to write out
# filename (str): The (optional) file name to write out to
def dump(contents, filename=None):
roundtrip_dump(node_sanitize(contents), file=filename)
# node_get_provenance()
#
# Gets the provenance for a node
#
# Args:
# node (dict): a dictionary
# key (str): key in the dictionary
# indices (list of indexes): Index path, in the case of list values
#
# Returns: The Provenance of the dict, member or list element
#
def node_get_provenance(node, key=None, indices=None):
assert is_node(node)
if key is None:
# Retrieving the provenance for this node directly
return ProvenanceInformation(node)
if key and not indices:
return ProvenanceInformation(node[0].get(key))
nodeish = node[0].get(key)
for idx in indices:
nodeish = nodeish[0][idx]
return ProvenanceInformation(nodeish)
# A sentinel to be used as a default argument for functions that need
# to distinguish between a kwarg set to None and an unset kwarg.
_sentinel = object()
# node_get()
#
# Fetches a value from a dictionary node and checks it for
# an expected value. Use default_value when parsing a value
# which is only optionally supplied.
#
# Args:
# node (dict): The dictionary node
# expected_type (type): The expected type for the value being searched
# key (str): The key to get a value for in node
# indices (list of ints): Optionally decend into lists of lists
# default_value: Optionally return this value if the key is not found
# allow_none: (bool): Allow None to be a valid value
#
# Returns:
# The value if found in node, otherwise default_value is returned
#
# Raises:
# LoadError, when the value found is not of the expected type
#
# Note:
# Returned strings are stripped of leading and trailing whitespace
#
def node_get(node, expected_type, key, indices=None, *, default_value=_sentinel, allow_none=False):
assert type(node) is Node
if indices is None:
if default_value is _sentinel:
value = node[0].get(key, Node(default_value, None, 0, 0))
else:
value = node[0].get(key, Node(default_value, None, 0, next(_SYNTHETIC_COUNTER)))
if value[0] is _sentinel:
provenance = node_get_provenance(node)
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: Dictionary did not contain expected key '{}'".format(provenance, key))
else:
# Implied type check of the element itself
# No need to synthesise useful node content as we destructure it immediately
value = Node(node_get(node, list, key), None, 0, 0)
for index in indices:
value = value[0][index]
if type(value) is not Node:
value = (value,)
# Optionally allow None as a valid value for any type
if value[0] is None and (allow_none or default_value is None):
return None
if (expected_type is not None) and (not isinstance(value[0], expected_type)):
# Attempt basic conversions if possible, typically we want to
# be able to specify numeric values and convert them to strings,
# but we dont want to try converting dicts/lists
try:
if (expected_type == bool and isinstance(value[0], str)):
# Dont coerce booleans to string, this makes "False" strings evaluate to True
# We don't structure into full nodes since there's no need.
if value[0] in ('True', 'true'):
value = (True, None, 0, 0)
elif value[0] in ('False', 'false'):
value = (False, None, 0, 0)
else:
raise ValueError()
elif not (expected_type == list or
expected_type == dict or
isinstance(value[0], (list, dict))):
value = (expected_type(value[0]), None, 0, 0)
else:
raise ValueError()
except (ValueError, TypeError):
provenance = node_get_provenance(node, key=key, indices=indices)
if indices:
path = [key]
path.extend("[{:d}]".format(i) for i in indices)
path = "".join(path)
else:
path = key
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: Value of '{}' is not of the expected type '{}'"
.format(provenance, path, expected_type.__name__))
# Now collapse lists, and scalars, to their value, leaving nodes as-is
if type(value[0]) is not dict:
value = value[0]
# Trim it at the bud, let all loaded strings from yaml be stripped of whitespace
if type(value) is str:
value = value.strip()
elif type(value) is list:
# Now we create a fresh list which unwraps the str and list types
# semi-recursively.
value = __trim_list_provenance(value)
return value
def __trim_list_provenance(value):
ret = []
for entry in value:
if type(entry) is not Node:
entry = (entry, None, 0, 0)
if type(entry[0]) is list:
ret.append(__trim_list_provenance(entry[0]))
elif type(entry[0]) is dict:
ret.append(entry)
else:
ret.append(entry[0])
return ret
# node_set()
#
# Set an item within the node. If using `indices` be aware that the entry must
# already exist, or else a KeyError will be raised. Use `node_extend_list` to
# create entries before using `node_set`
#
# Args:
# node (tuple): The node
# key (str): The key name
# value: The value
# indices: Any indices to index into the list referenced by key, like in
# `node_get` (must be a list of integers)
#
def node_set(node, key, value, indices=None):
if indices:
node = node[0][key]
key = indices.pop()
for idx in indices:
node = node[0][idx]
if type(value) is Node:
node[0][key] = value
else:
try:
# Need to do this just in case we're modifying a list
old_value = node[0][key]
except KeyError:
old_value = None
if old_value is None:
node[0][key] = Node(value, node[1], node[2], next(_SYNTHETIC_COUNTER))
else:
node[0][key] = Node(value, old_value[1], old_value[2], old_value[3])
# node_extend_list()
#
# Extend a list inside a node to a given length, using the passed
# default value to fill it out.
#
# Valid default values are:
# Any string
# An empty dict
# An empty list
#
# Args:
# node (node): The node
# key (str): The list name in the node
# length (int): The length to extend the list to
# default (any): The default value to extend with.
def node_extend_list(node, key, length, default):
assert type(default) is str or default in ([], {})
list_node = node[0].get(key)
if list_node is None:
list_node = node[0][key] = Node([], node[1], node[2], next(_SYNTHETIC_COUNTER))
assert type(list_node[0]) is list
the_list = list_node[0]
def_type = type(default)
file_index = node[1]
if the_list:
line_num = the_list[-1][2]
else:
line_num = list_node[2]
while length > len(the_list):
if def_type is str:
value = default
elif def_type is list:
value = []
else:
value = {}
line_num += 1
the_list.append(Node(value, file_index, line_num, next(_SYNTHETIC_COUNTER)))
# node_items()
#
# A convenience generator for iterating over loaded key/value
# tuples in a dictionary loaded from project YAML.
#
# Args:
# node (dict): The dictionary node
#
# Yields:
# (str): The key name
# (anything): The value for the key
#
def node_items(node):
if type(node) is not Node:
node = Node(node, None, 0, 0)
for key, value in node[0].items():
if type(value) is not Node:
value = Node(value, None, 0, 0)
if type(value[0]) is dict:
yield (key, value)
elif type(value[0]) is list:
yield (key, __trim_list_provenance(value[0]))
else:
yield (key, value[0])
# node_keys()
#
# A convenience generator for iterating over loaded keys
# in a dictionary loaded from project YAML.
#
# Args:
# node (dict): The dictionary node
#
# Yields:
# (str): The key name
#
def node_keys(node):
if type(node) is not Node:
node = Node(node, None, 0, 0)
yield from node[0].keys()
# node_del()
#
# A convenience generator for iterating over loaded key/value
# tuples in a dictionary loaded from project YAML.
#
# Args:
# node (dict): The dictionary node
# key (str): The key we want to remove
# safe (bool): Whether to raise a KeyError if unable
#
def node_del(node, key, safe=False):
try:
del node[0][key]
except KeyError:
if not safe:
raise
# is_node()
#
# A test method which returns whether or not the passed in value
# is a valid YAML node. It is not valid to call this on a Node
# object which is not a Mapping.
#
# Args:
# maybenode (any): The object to test for nodeness
#
# Returns:
# (bool): Whether or not maybenode was a Node
#
def is_node(maybenode):
# It's a programming error to give this a Node which isn't a mapping
# so assert that.
assert (type(maybenode) is not Node) or (type(maybenode[0]) is dict)
# Now return the type check
return type(maybenode) is Node
# new_synthetic_file()
#
# Create a new synthetic mapping node, with an associated file entry
# (in _FILE_LIST) such that later tracking can correctly determine which
# file needs writing to in order to persist the changes.
#
# Args:
# filename (str): The name of the synthetic file to create
# project (Project): The optional project to associate this synthetic file with
#
# Returns:
# (Node): An empty YAML mapping node, whose provenance is to this new
# synthetic file
#
def new_synthetic_file(filename, project=None):
file_index = len(_FILE_LIST)
node = Node({}, file_index, 0, 0)
_FILE_LIST.append((filename,
filename,
"".format(filename),
node,
project))
return node
# new_empty_node()
#
# Args:
# ref_node (Node): Optional node whose provenance should be referenced
#
# Returns
# (Node): A new empty YAML mapping node
#
def new_empty_node(ref_node=None):
if ref_node is not None:
return Node({}, ref_node[1], ref_node[2], next(_SYNTHETIC_COUNTER))
else:
return Node({}, None, 0, 0)
# new_node_from_dict()
#
# Args:
# indict (dict): The input dictionary
#
# Returns:
# (Node): A new synthetic YAML tree which represents this dictionary
#
def new_node_from_dict(indict):
ret = {}
for k, v in indict.items():
vtype = type(v)
if vtype is dict:
ret[k] = new_node_from_dict(v)
elif vtype is list:
ret[k] = __new_node_from_list(v)
else:
ret[k] = Node(str(v), None, 0, next(_SYNTHETIC_COUNTER))
return Node(ret, None, 0, next(_SYNTHETIC_COUNTER))
# Internal function to help new_node_from_dict() to handle lists
def __new_node_from_list(inlist):
ret = []
for v in inlist:
vtype = type(v)
if vtype is dict:
ret.append(new_node_from_dict(v))
elif vtype is list:
ret.append(__new_node_from_list(v))
else:
ret.append(Node(str(v), None, 0, next(_SYNTHETIC_COUNTER)))
return Node(ret, None, 0, next(_SYNTHETIC_COUNTER))
# _is_composite_list
#
# Checks if the given node is a Mapping with array composition
# directives.
#
# Args:
# node (value): Any node
#
# Returns:
# (bool): True if node was a Mapping containing only
# list composition directives
#
# Raises:
# (LoadError): If node was a mapping and contained a mix of
# list composition directives and other keys
#
def _is_composite_list(node):
if type(node[0]) is dict:
has_directives = False
has_keys = False
for key, _ in node_items(node):
if key in ['(>)', '(<)', '(=)']: # pylint: disable=simplifiable-if-statement
has_directives = True
else:
has_keys = True
if has_keys and has_directives:
provenance = node_get_provenance(node)
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: Dictionary contains array composition directives and arbitrary keys"
.format(provenance))
return has_directives
return False
# _compose_composite_list()
#
# Composes a composite list (i.e. a dict with list composition directives)
# on top of a target list which is a composite list itself.
#
# Args:
# target (Node): A composite list
# source (Node): A composite list
#
def _compose_composite_list(target, source):
clobber = source[0].get("(=)")
prefix = source[0].get("(<)")
suffix = source[0].get("(>)")
if clobber is not None:
# We want to clobber the target list
# which basically means replacing the target list
# with ourselves
target[0]["(=)"] = clobber
if prefix is not None:
target[0]["(<)"] = prefix
elif "(<)" in target[0]:
target[0]["(<)"][0].clear()
if suffix is not None:
target[0]["(>)"] = suffix
elif "(>)" in target[0]:
target[0]["(>)"][0].clear()
else:
# Not clobbering, so prefix the prefix and suffix the suffix
if prefix is not None:
if "(<)" in target[0]:
for v in reversed(prefix[0]):
target[0]["(<)"][0].insert(0, v)
else:
target[0]["(<)"] = prefix
if suffix is not None:
if "(>)" in target[0]:
target[0]["(>)"][0].extend(suffix[0])
else:
target[0]["(>)"] = suffix
# _compose_list()
#
# Compose a composite list (a dict with composition directives) on top of a
# simple list.
#
# Args:
# target (Node): The target list to be composed into
# source (Node): The composition list to be composed from
#
def _compose_list(target, source):
clobber = source[0].get("(=)")
prefix = source[0].get("(<)")
suffix = source[0].get("(>)")
if clobber is not None:
target[0].clear()
target[0].extend(clobber[0])
if prefix is not None:
for v in reversed(prefix[0]):
target[0].insert(0, v)
if suffix is not None:
target[0].extend(suffix[0])
# composite_dict()
#
# Compose one mapping node onto another
#
# Args:
# target (Node): The target to compose into
# source (Node): The source to compose from
# path (list): The path to the current composition node
#
# Raises: CompositeError
#
def composite_dict(target, source, path=None):
if path is None:
path = []
for k, v in source[0].items():
path.append(k)
if type(v[0]) is list:
# List clobbers anything list-like
target_value = target[0].get(k)
if not (target_value is None or
type(target_value[0]) is list or
_is_composite_list(target_value)):
raise CompositeError(path,
"{}: List cannot overwrite {} at: {}"
.format(node_get_provenance(source, k),
k,
node_get_provenance(target, k)))
# Looks good, clobber it
target[0][k] = v
elif _is_composite_list(v):
if k not in target[0]:
# Composite list clobbers empty space
target[0][k] = v
elif type(target[0][k][0]) is list:
# Composite list composes into a list
_compose_list(target[0][k], v)
elif _is_composite_list(target[0][k]):
# Composite list merges into composite list
_compose_composite_list(target[0][k], v)
else:
# Else composing on top of normal dict or a scalar, so raise...
raise CompositeError(path,
"{}: Cannot compose lists onto {}".format(
node_get_provenance(v),
node_get_provenance(target[0][k])))
elif type(v[0]) is dict:
# We're composing a dict into target now
if k not in target[0]:
# Target lacks a dict at that point, make a fresh one with
# the same provenance as the incoming dict
target[0][k] = Node({}, v[1], v[2], v[3])
if type(target[0]) is not dict:
raise CompositeError(path,
"{}: Cannot compose dictionary onto {}".format(
node_get_provenance(v),
node_get_provenance(target[0][k])))
composite_dict(target[0][k], v, path)
else:
target_value = target[0].get(k)
if target_value is not None and type(target_value[0]) is not str:
raise CompositeError(path,
"{}: Cannot compose scalar on non-scalar at {}".format(
node_get_provenance(v),
node_get_provenance(target[0][k])))
target[0][k] = v
path.pop()
# Like composite_dict(), but raises an all purpose LoadError for convenience
#
def composite(target, source):
assert type(source[0]) is dict
assert type(target[0]) is dict
try:
composite_dict(target, source)
except CompositeError as e:
source_provenance = node_get_provenance(source)
error_prefix = ""
if source_provenance:
error_prefix = "{}: ".format(source_provenance)
raise LoadError(LoadErrorReason.ILLEGAL_COMPOSITE,
"{}Failure composing {}: {}"
.format(error_prefix,
e.path,
e.message)) from e
# Like composite(target, source), but where target overrides source instead.
#
def composite_and_move(target, source):
composite(source, target)
to_delete = [key for key in target[0].keys() if key not in source[0]]
for key, value in source[0].items():
target[0][key] = value
for key in to_delete:
del target[0][key]
# Types we can short-circuit in node_sanitize for speed.
__SANITIZE_SHORT_CIRCUIT_TYPES = (int, float, str, bool)
# node_sanitize()
#
# Returns an alphabetically ordered recursive copy
# of the source node with internal provenance information stripped.
#
# Only dicts are ordered, list elements are left in order.
#
def node_sanitize(node, *, dict_type=OrderedDict):
node_type = type(node)
# If we have an unwrappable node, unwrap it
if node_type is Node:
node = node[0]
node_type = type(node)
# Short-circuit None which occurs ca. twice per element
if node is None:
return node
# Next short-circuit integers, floats, strings, booleans, and tuples
if node_type in __SANITIZE_SHORT_CIRCUIT_TYPES:
return node
# Now short-circuit lists.
elif node_type is list:
return [node_sanitize(elt, dict_type=dict_type) for elt in node]
# Finally dict, and other Mappings need special handling
elif node_type is dict:
result = dict_type()
key_list = [key for key, _ in node.items()]
for key in sorted(key_list):
result[key] = node_sanitize(node[key], dict_type=dict_type)
return result
# Sometimes we're handed tuples and we can't be sure what they contain
# so we have to sanitize into them
elif node_type is tuple:
return tuple((node_sanitize(v, dict_type=dict_type) for v in node))
# Everything else just gets returned as-is.
return node
# node_validate()
#
# Validate the node so as to ensure the user has not specified
# any keys which are unrecognized by buildstream (usually this
# means a typo which would otherwise not trigger an error).
#
# Args:
# node (dict): A dictionary loaded from YAML
# valid_keys (list): A list of valid keys for the specified node
#
# Raises:
# LoadError: In the case that the specified node contained
# one or more invalid keys
#
def node_validate(node, valid_keys):
# Probably the fastest way to do this: https://stackoverflow.com/a/23062482
valid_keys = set(valid_keys)
invalid = next((key for key in node[0] if key not in valid_keys), None)
if invalid:
provenance = node_get_provenance(node, key=invalid)
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: Unexpected key: {}".format(provenance, invalid))
# Node copying
#
# Unfortunately we copy nodes a *lot* and `isinstance()` is super-slow when
# things from collections.abc get involved. The result is the following
# intricate but substantially faster group of tuples and the use of `in`.
#
# If any of the {node,list}_copy routines raise a ValueError
# then it's likely additional types need adding to these tuples.
# These types just have their value copied
__QUICK_TYPES = (str, bool)
# These are the directives used to compose lists, we need this because it's
# slightly faster during the node_final_assertions checks
__NODE_ASSERT_COMPOSITION_DIRECTIVES = ('(>)', '(<)', '(=)')
# node_copy()
#
# Make a deep copy of the given YAML node, preserving provenance.
#
# Args:
# source (Node): The YAML node to copy
#
# Returns:
# (Node): A deep copy of source with provenance preserved.
#
def node_copy(source):
copy = {}
for key, value in source[0].items():
value_type = type(value[0])
if value_type is dict:
copy[key] = node_copy(value)
elif value_type is list:
copy[key] = _list_copy(value)
elif value_type in __QUICK_TYPES:
copy[key] = value
else:
raise ValueError("Unable to be quick about node_copy of {}".format(value_type))
return Node(copy, source[1], source[2], source[3])
# Internal function to help node_copy() but for lists.
def _list_copy(source):
copy = []
for item in source[0]:
item_type = type(item[0])
if item_type is dict:
copy.append(node_copy(item))
elif item_type is list:
copy.append(_list_copy(item))
elif item_type in __QUICK_TYPES:
copy.append(item)
else:
raise ValueError("Unable to be quick about list_copy of {}".format(item_type))
return Node(copy, source[1], source[2], source[3])
# node_final_assertions()
#
# This must be called on a fully loaded and composited node,
# after all composition has completed.
#
# Args:
# node (Mapping): The final composited node
#
# Raises:
# (LoadError): If any assertions fail
#
def node_final_assertions(node):
assert type(node) is Node
for key, value in node[0].items():
# Assert that list composition directives dont remain, this
# indicates that the user intended to override a list which
# never existed in the underlying data
#
if key in __NODE_ASSERT_COMPOSITION_DIRECTIVES:
provenance = node_get_provenance(node, key)
raise LoadError(LoadErrorReason.TRAILING_LIST_DIRECTIVE,
"{}: Attempt to override non-existing list".format(provenance))
value_type = type(value[0])
if value_type is dict:
node_final_assertions(value)
elif value_type is list:
_list_final_assertions(value)
# Helper function for node_final_assertions(), but for lists.
def _list_final_assertions(values):
for value in values[0]:
value_type = type(value[0])
if value_type is dict:
node_final_assertions(value)
elif value_type is list:
_list_final_assertions(value)
# assert_symbol_name()
#
# A helper function to check if a loaded string is a valid symbol
# name and to raise a consistent LoadError if not. For strings which
# are required to be symbols.
#
# Args:
# provenance (Provenance): The provenance of the loaded symbol, or None
# symbol_name (str): The loaded symbol name
# purpose (str): The purpose of the string, for an error message
# allow_dashes (bool): Whether dashes are allowed for this symbol
#
# Raises:
# LoadError: If the symbol_name is invalid
#
# Note that dashes are generally preferred for variable names and
# usage in YAML, but things such as option names which will be
# evaluated with jinja2 cannot use dashes.
def assert_symbol_name(provenance, symbol_name, purpose, *, allow_dashes=True):
valid_chars = string.digits + string.ascii_letters + '_'
if allow_dashes:
valid_chars += '-'
valid = True
if not symbol_name:
valid = False
elif any(x not in valid_chars for x in symbol_name):
valid = False
elif symbol_name[0] in string.digits:
valid = False
if not valid:
detail = "Symbol names must contain only alphanumeric characters, " + \
"may not start with a digit, and may contain underscores"
if allow_dashes:
detail += " or dashes"
message = "Invalid symbol name for {}: '{}'".format(purpose, symbol_name)
if provenance is not None:
message = "{}: {}".format(provenance, message)
raise LoadError(LoadErrorReason.INVALID_SYMBOL_NAME,
message, detail=detail)
# node_find_target()
#
# Searches the given node tree for the given target node.
#
# This is typically used when trying to walk a path to a given node
# for the purpose of then modifying a similar tree of objects elsewhere
#
# If the key is provided, then we actually hunt for the node represented by
# target[key] and return its container, rather than hunting for target directly
#
# Args:
# node (Node): The node at the root of the tree to search
# target (Node): The node you are looking for in that tree
# key (str): Optional string key within target node
#
# Returns:
# (list): A path from `node` to `target` or None if `target` is not in the subtree
def node_find_target(node, target, *, key=None):
assert type(node) is Node
assert type(target) is Node
if key is not None:
target = target[0][key]
path = []
if _walk_find_target(node, path, target):
if key:
# Remove key from end of path
path = path[:-1]
return path
return None
# Helper for node_find_target() which walks a value
def _walk_find_target(node, path, target):
if node[1:] == target[1:]:
return True
elif type(node[0]) is dict:
return _walk_dict_node(node, path, target)
elif type(node[0]) is list:
return _walk_list_node(node, path, target)
return False
# Helper for node_find_target() which walks a list
def _walk_list_node(node, path, target):
for i, v in enumerate(node[0]):
path.append(i)
if _walk_find_target(v, path, target):
return True
del path[-1]
return False
# Helper for node_find_target() which walks a mapping
def _walk_dict_node(node, path, target):
for k, v in node[0].items():
path.append(k)
if _walk_find_target(v, path, target):
return True
del path[-1]
return False
###############################################################################
# Roundtrip code
# Always represent things consistently:
yaml.RoundTripRepresenter.add_representer(OrderedDict,
yaml.SafeRepresenter.represent_dict)
# Always parse things consistently
yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:int',
yaml.RoundTripConstructor.construct_yaml_str)
yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:float',
yaml.RoundTripConstructor.construct_yaml_str)
yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:bool',
yaml.RoundTripConstructor.construct_yaml_str)
yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:null',
yaml.RoundTripConstructor.construct_yaml_str)
yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:timestamp',
yaml.RoundTripConstructor.construct_yaml_str)
# HardlineDumper
#
# This is a dumper used during roundtrip_dump which forces every scalar to be
# a plain string, in order to match the output format to the input format.
#
# If you discover something is broken, please add a test case to the roundtrip
# test in tests/internals/yaml/roundtrip-test.yaml
#
class HardlineDumper(yaml.RoundTripDumper):
def __init__(self, *args, **kwargs):
yaml.RoundTripDumper.__init__(self, *args, **kwargs)
# For each of YAML 1.1 and 1.2, force everything to be a plain string
for version in [(1, 1), (1, 2), None]:
self.add_version_implicit_resolver(
version,
u'tag:yaml.org,2002:str',
yaml.util.RegExp(r'.*'),
None)
# roundtrip_load()
#
# Load a YAML file into memory in a form which allows roundtripping as best
# as ruamel permits.
#
# Note, the returned objects can be treated as Mappings and Lists and Strings
# but replacing content wholesale with plain dicts and lists may result
# in a loss of comments and formatting.
#
# Args:
# filename (str): The file to load in
# allow_missing (bool): Optionally set this to True to allow missing files
#
# Returns:
# (Mapping): The loaded YAML mapping.
#
# Raises:
# (LoadError): If the file is missing, or a directory, this is raised.
# Also if the YAML is malformed.
#
def roundtrip_load(filename, *, allow_missing=False):
try:
with open(filename, "r") as fh:
data = fh.read()
contents = roundtrip_load_data(data, filename=filename)
except FileNotFoundError as e:
if allow_missing:
# Missing files are always empty dictionaries
return {}
else:
raise LoadError(LoadErrorReason.MISSING_FILE,
"Could not find file at {}".format(filename)) from e
except IsADirectoryError as e:
raise LoadError(LoadErrorReason.LOADING_DIRECTORY,
"{} is a directory."
.format(filename)) from e
return contents
# roundtrip_load_data()
#
# Parse the given contents as YAML, returning them as a roundtrippable data
# structure.
#
# A lack of content will be returned as an empty mapping.
#
# Args:
# contents (str): The contents to be parsed as YAML
# filename (str): Optional filename to be used in error reports
#
# Returns:
# (Mapping): The loaded YAML mapping
#
# Raises:
# (LoadError): Raised on invalid YAML, or YAML which parses to something other
# than a Mapping
#
def roundtrip_load_data(contents, *, filename=None):
try:
contents = yaml.load(contents, yaml.RoundTripLoader, preserve_quotes=True)
except (yaml.scanner.ScannerError, yaml.composer.ComposerError, yaml.parser.ParserError) as e:
raise LoadError(LoadErrorReason.INVALID_YAML,
"Malformed YAML:\n\n{}\n\n{}\n".format(e.problem, e.problem_mark)) from e
# Special case empty files at this point
if contents is None:
# We'll make them empty mappings like the main Node loader
contents = {}
if not isinstance(contents, Mapping):
raise LoadError(LoadErrorReason.INVALID_YAML,
"YAML file has content of type '{}' instead of expected type 'dict': {}"
.format(type(contents).__name__, filename))
return contents
# roundtrip_dump()
#
# Dumps the given contents as a YAML file. Ideally the contents came from
# parsing with `roundtrip_load` or `roundtrip_load_data` so that they will be
# dumped in the same form as they came from.
#
# If `file` is a string, it is the filename to write to, if `file` has a
# `write` method, it's treated as a stream, otherwise output is to stdout.
#
# Args:
# contents (Mapping or list): The content to write out as YAML.
# file (any): The file to write to
#
def roundtrip_dump(contents, file=None):
assert type(contents) is not Node
def stringify_dict(thing):
for k, v in thing.items():
if type(v) is str:
pass
elif isinstance(v, Mapping):
stringify_dict(v)
elif isinstance(v, Sequence):
stringify_list(v)
else:
thing[k] = str(v)
def stringify_list(thing):
for i, v in enumerate(thing):
if type(v) is str:
pass
elif isinstance(v, Mapping):
stringify_dict(v)
elif isinstance(v, Sequence):
stringify_list(v)
else:
thing[i] = str(v)
contents = deepcopy(contents)
stringify_dict(contents)
with ExitStack() as stack:
if type(file) is str:
from . import utils
f = stack.enter_context(utils.save_file_atomic(file, 'w'))
elif hasattr(file, 'write'):
f = file
else:
f = sys.stdout
yaml.round_trip_dump(contents, f, Dumper=HardlineDumper)