# # Copyright (C) 2018 Codethink Limited # Copyright (C) 2019 Bloomberg LLP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom # Daniel Silverstone # James Ennis # Benjamin Schubert import sys import string from contextlib import ExitStack from collections import OrderedDict from collections.abc import Mapping, Sequence from copy import deepcopy from ruamel import yaml from ._exceptions import LoadError, LoadErrorReason # Without this, pylint complains about all the `type(foo) is blah` checks # because it feels isinstance() is more idiomatic. Sadly, it is much slower to # do `isinstance(foo, blah)` for reasons I am unable to fathom. As such, we # blanket disable the check for this module. # # pylint: disable=unidiomatic-typecheck # A sentinel to be used as a default argument for functions that need # to distinguish between a kwarg set to None and an unset kwarg. _sentinel = object() # Node() # # Container for YAML loaded data and its provenance # # All nodes returned (and all internal lists/strings) have this type (rather # than a plain tuple, to distinguish them in things like node_sanitize) # # Members: # value (str/list/dict): The loaded value. # file_index (int): Index within _FILE_LIST (a list of loaded file paths). # Negative indices indicate synthetic nodes so that # they can be referenced. # line (int): The line number within the file where the value appears. # col (int): The column number within the file where the value appears. # cdef class Node: def __init__(self, object value, int file_index, int line, int column): self.value = value self.file_index = file_index self.line = line self.column = column def __contains__(self, what): # Delegate to the inner value, though this will likely not work # very well if the node is a list or string, it's unlikely that # code which has access to such nodes would do this. return what in self.value cpdef Node copy(self): raise NotImplementedError() cdef class ScalarNode(Node): def __init__(self, object value, int file_index, int line, int column): if type(value) is str: value = value.strip() self.value = value self.file_index = file_index self.line = line self.column = column cpdef ScalarNode copy(self): return self cpdef bint is_none(self): return self.value is None cpdef bint as_bool(self) except *: if type(self.value) is bool: return self.value # Don't coerce booleans to string, this makes "False" strings evaluate to True if self.value in ('True', 'true'): return True elif self.value in ('False', 'false'): return False else: provenance = node_get_provenance(self) path = node_find_target(provenance.toplevel, self)[-1] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, bool.__name__, self.value)) cpdef int as_int(self) except *: try: return int(self.value) except ValueError: provenance = node_get_provenance(self) path = node_find_target(provenance.toplevel, self)[-1] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, int.__name__)) cpdef str as_str(self): # We keep 'None' as 'None' to simplify the API's usage and allow chaining for users if self.value is None: return None return str(self.value) cdef class MappingNode(Node): def __init__(self, dict value, int file_index, int line, int column): self.value = value self.file_index = file_index self.line = line self.column = column cpdef MappingNode copy(self): cdef dict copy = {} cdef str key cdef Node value for key, value in self.value.items(): copy[key] = value.copy() return MappingNode(copy, self.file_index, self.line, self.column) cdef Node get(self, str key, object default, object default_constructor): value = self.value.get(key, _sentinel) if value is _sentinel: if default is _sentinel: provenance = node_get_provenance(self) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary did not contain expected key '{}'".format(provenance, key)) if default is None: value = None else: value = default_constructor(default, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) return value cpdef MappingNode get_mapping(self, str key, object default=_sentinel): value = self.get(key, default, MappingNode) if type(value) is not MappingNode and value is not None: provenance = node_get_provenance(value) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type 'Mapping'" .format(provenance, key)) return value cpdef Node get_node(self, str key, list allowed_types, bint allow_none = False): cdef value = self.value.get(key, _sentinel) if value is _sentinel: if allow_none: return None provenance = node_get_provenance(self) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary did not contain expected key '{}'".format(provenance, key)) if type(value) not in allowed_types: provenance = node_get_provenance(self) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not one of the following: {}.".format( provenance, key, ", ".join(allowed_types))) return value cpdef ScalarNode get_scalar(self, str key, object default=_sentinel): value = self.get(key, default, ScalarNode) if type(value) is not ScalarNode: if value is None: value = ScalarNode(None, self.file_index, 0, next_synthetic_counter()) else: provenance = node_get_provenance(value) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type 'Scalar'" .format(provenance, key)) return value cpdef SequenceNode get_sequence(self, str key, object default=_sentinel): value = self.get(key, default, SequenceNode) if type(value) is not SequenceNode and value is not None: provenance = node_get_provenance(value) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type 'Sequence'" .format(provenance, key)) return value cpdef bint get_bool(self, str key, object default=_sentinel) except *: cdef ScalarNode scalar = self.get_scalar(key, default) return scalar.as_bool() cpdef int get_int(self, str key, object default=_sentinel) except *: cdef ScalarNode scalar = self.get_scalar(key, default) return scalar.as_int() cpdef str get_str(self, str key, object default=_sentinel): cdef ScalarNode scalar = self.get_scalar(key, default) return scalar.as_str() cpdef object items(self): return self.value.items() cpdef list keys(self): return list(self.value.keys()) cpdef void safe_del(self, str key): try: del self.value[key] except KeyError: pass cpdef object values(self): return self.value.values() def __delitem__(self, str key): del self.value[key] def __setitem__(self, str key, object value): if type(value) in [MappingNode, ScalarNode, SequenceNode]: self.value[key] = value else: node = _create_node_recursive(value) # FIXME: Do we really want to override provenance? # # Related to https://gitlab.com/BuildStream/buildstream/issues/1058 # # There are only two cases were nodes are set in the code (hence without provenance): # - When automatic variables are set by the core (e-g: max-jobs) # - when plugins call Element.set_public_data # # The first case should never throw errors, so it is of limited interests. # # The second is more important. What should probably be done here is to have 'set_public_data' # able of creating a fake provenance with the name of the plugin, the project and probably the # element name. # # We would therefore have much better error messages, and would be able to get rid of most synthetic # nodes. old_value = self.value.get(key) if old_value: node.file_index = old_value.file_index node.line = old_value.line node.column = old_value.column self.value[key] = node cdef class SequenceNode(Node): def __init__(self, list value, int file_index, int line, int column): self.value = value self.file_index = file_index self.line = line self.column = column cpdef SequenceNode copy(self): cdef list copy = [] cdef Node entry for entry in self.value: copy.append(entry.copy()) return SequenceNode(copy, self.file_index, self.line, self.column) cpdef MappingNode mapping_at(self, int index): value = self.value[index] if type(value) is not MappingNode: provenance = node_get_provenance(self) path = ["[{}]".format(p) for p in node_find_target(provenance.toplevel, self)] + ["[{}]".format(index)] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, MappingNode.__name__)) return value cpdef SequenceNode sequence_at(self, int index): value = self.value[index] if type(value) is not SequenceNode: provenance = node_get_provenance(self) path = ["[{}]".format(p) for p in node_find_target(provenance.toplevel, self)] + ["[{}]".format(index)] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, SequenceNode.__name__)) return value cpdef list as_str_list(self): return [node.as_str() for node in self.value] def __iter__(self): return iter(self.value) def __len__(self): return len(self.value) def __reversed__(self): return reversed(self.value) def __setitem__(self, int key, object value): if type(value) in [MappingNode, ScalarNode, SequenceNode]: self.value[key] = value else: node = _create_node_recursive(value) # FIXME: Do we really want to override provenance? # See __setitem__ on 'MappingNode' for more context old_value = self.value[key] if old_value: node.file_index = old_value.file_index node.line = old_value.line node.column = old_value.column self.value[key] = node # Metadata container for a yaml toplevel node. # # This class contains metadata around a yaml node in order to be able # to trace back the provenance of a node to the file. # cdef class FileInfo: cdef str filename, shortname, displayname cdef Node toplevel, cdef object project def __init__(self, str filename, str shortname, str displayname, Node toplevel, object project): self.filename = filename self.shortname = shortname self.displayname = displayname self.toplevel = toplevel self.project = project # File name handling cdef _FILE_LIST = [] # Purely synthetic node will have _SYNTHETIC_FILE_INDEX for the file number, have line number # zero, and a negative column number which comes from inverting the next value # out of this counter. Synthetic nodes created with a reference node will # have a file number from the reference node, some unknown line number, and # a negative column number from this counter. cdef int _SYNTHETIC_FILE_INDEX = -1 cdef int __counter = 0 cdef int next_synthetic_counter(): global __counter __counter -= 1 return __counter # Returned from node_get_provenance cdef class ProvenanceInformation: def __init__(self, Node nodeish): cdef FileInfo fileinfo self.node = nodeish if (nodeish is None) or (nodeish.file_index == _SYNTHETIC_FILE_INDEX): self.filename = "" self.shortname = "" self.displayname = "" self.line = 1 self.col = 0 self.toplevel = None self.project = None else: fileinfo = _FILE_LIST[nodeish.file_index] self.filename = fileinfo.filename self.shortname = fileinfo.shortname self.displayname = fileinfo.displayname # We add 1 here to convert from computerish to humanish self.line = nodeish.line + 1 self.col = nodeish.column self.toplevel = fileinfo.toplevel self.project = fileinfo.project self.is_synthetic = (self.filename == '') or (self.col < 0) # Convert a Provenance to a string for error reporting def __str__(self): if self.is_synthetic: return "{} [synthetic node]".format(self.displayname) else: return "{} [line {:d} column {:d}]".format(self.displayname, self.line, self.col) # These exceptions are intended to be caught entirely within # the BuildStream framework, hence they do not reside in the # public exceptions.py class CompositeError(Exception): def __init__(self, path, message): super().__init__(message) self.path = path self.message = message class YAMLLoadError(Exception): pass # Represents the various states in which the Representer can be # while parsing yaml. cdef enum RepresenterState: doc init stream wait_key wait_list_item wait_value ctypedef RepresenterState (*representer_action)(Representer, object) # Representer for YAML events comprising input to the BuildStream format. # # All streams MUST represent a single document which must be a Mapping. # Anything else is considered an error. # # Mappings must only have string keys, values are always represented as # strings if they are scalar, or else as simple dictionaries and lists. # cdef class Representer: cdef int _file_index cdef RepresenterState state cdef list output, keys # Initialise a new representer # # The file index is used to store into the Node instances so that the # provenance of the YAML can be tracked. # # Args: # file_index (int): The index of this YAML file def __init__(self, int file_index): self._file_index = file_index self.state = RepresenterState.init self.output = [] self.keys = [] # Handle a YAML parse event # # Args: # event (YAML Event): The event to be handled # # Raises: # YAMLLoadError: Something went wrong. cdef void handle_event(self, event) except *: if getattr(event, "anchor", None) is not None: raise YAMLLoadError("Anchors are disallowed in BuildStream at line {} column {}" .format(event.start_mark.line, event.start_mark.column)) cdef str event_name = event.__class__.__name__ if event_name == "ScalarEvent": if event.tag is not None: if not event.tag.startswith("tag:yaml.org,2002:"): raise YAMLLoadError( "Non-core tag expressed in input. " + "This is disallowed in BuildStream. At line {} column {}" .format(event.start_mark.line, event.start_mark.column)) cdef representer_action handler = self._get_handler_for_event(event_name) if not handler: raise YAMLLoadError( "Invalid input detected. No handler for {} in state {} at line {} column {}" .format(event, self.state, event.start_mark.line, event.start_mark.column)) # Cython weirdness here, we need to pass self to the function self.state = handler(self, event) # pylint: disable=not-callable # Get the output of the YAML parse # # Returns: # (Node or None): Return the Node instance of the top level mapping or # None if there wasn't one. cdef Node get_output(self): if len(self.output): return self.output[0] return None cdef representer_action _get_handler_for_event(self, str event_name): if self.state == RepresenterState.wait_list_item: if event_name == "ScalarEvent": return self._handle_wait_list_item_ScalarEvent elif event_name == "MappingStartEvent": return self._handle_wait_list_item_MappingStartEvent elif event_name == "SequenceStartEvent": return self._handle_wait_list_item_SequenceStartEvent elif event_name == "SequenceEndEvent": return self._handle_wait_list_item_SequenceEndEvent elif self.state == RepresenterState.wait_value: if event_name == "ScalarEvent": return self._handle_wait_value_ScalarEvent elif event_name == "MappingStartEvent": return self._handle_wait_value_MappingStartEvent elif event_name == "SequenceStartEvent": return self._handle_wait_value_SequenceStartEvent elif self.state == RepresenterState.wait_key: if event_name == "ScalarEvent": return self._handle_wait_key_ScalarEvent elif event_name == "MappingEndEvent": return self._handle_wait_key_MappingEndEvent elif self.state == RepresenterState.stream: if event_name == "DocumentStartEvent": return self._handle_stream_DocumentStartEvent elif event_name == "StreamEndEvent": return self._handle_stream_StreamEndEvent elif self.state == RepresenterState.doc: if event_name == "MappingStartEvent": return self._handle_doc_MappingStartEvent elif event_name == "DocumentEndEvent": return self._handle_doc_DocumentEndEvent elif self.state == RepresenterState.init and event_name == "StreamStartEvent": return self._handle_init_StreamStartEvent return NULL cdef RepresenterState _handle_init_StreamStartEvent(self, object ev): return RepresenterState.stream cdef RepresenterState _handle_stream_DocumentStartEvent(self, object ev): return RepresenterState.doc cdef RepresenterState _handle_doc_MappingStartEvent(self, object ev): newmap = MappingNode({}, self._file_index, ev.start_mark.line, ev.start_mark.column) self.output.append(newmap) return RepresenterState.wait_key cdef RepresenterState _handle_wait_key_ScalarEvent(self, object ev): self.keys.append(ev.value) return RepresenterState.wait_value cdef RepresenterState _handle_wait_value_ScalarEvent(self, object ev): key = self.keys.pop() ( ( self.output[-1]).value)[key] = \ ScalarNode(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column) return RepresenterState.wait_key cdef RepresenterState _handle_wait_value_MappingStartEvent(self, object ev): cdef RepresenterState new_state = self._handle_doc_MappingStartEvent(ev) key = self.keys.pop() ( ( self.output[-2]).value)[key] = self.output[-1] return new_state cdef RepresenterState _handle_wait_key_MappingEndEvent(self, object ev): # We've finished a mapping, so pop it off the output stack # unless it's the last one in which case we leave it if len(self.output) > 1: self.output.pop() if type(( self.output[-1]).value) is list: return RepresenterState.wait_list_item else: return RepresenterState.wait_key else: return RepresenterState.doc cdef RepresenterState _handle_wait_value_SequenceStartEvent(self, object ev): self.output.append(SequenceNode([], self._file_index, ev.start_mark.line, ev.start_mark.column)) ( ( self.output[-2]).value)[self.keys[-1]] = self.output[-1] return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_SequenceStartEvent(self, object ev): self.keys.append(len(( self.output[-1]).value)) self.output.append(SequenceNode([], self._file_index, ev.start_mark.line, ev.start_mark.column)) ( ( self.output[-2]).value).append(self.output[-1]) return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_SequenceEndEvent(self, object ev): # When ending a sequence, we need to pop a key because we retain the # key until the end so that if we need to mutate the underlying entry # we can. key = self.keys.pop() self.output.pop() if type(key) is int: return RepresenterState.wait_list_item else: return RepresenterState.wait_key cdef RepresenterState _handle_wait_list_item_ScalarEvent(self, object ev): ( self.output[-1]).value.append( ScalarNode(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column)) return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_MappingStartEvent(self, object ev): cdef RepresenterState new_state = self._handle_doc_MappingStartEvent(ev) ( ( self.output[-2]).value).append(self.output[-1]) return new_state cdef RepresenterState _handle_doc_DocumentEndEvent(self, object ev): if len(self.output) != 1: raise YAMLLoadError("Zero, or more than one document found in YAML stream") return RepresenterState.stream cdef RepresenterState _handle_stream_StreamEndEvent(self, object ev): return RepresenterState.init cdef Node _create_node(object value, int file_index, int line, int column): cdef type_value = type(value) if type_value in [bool, str, type(None), int]: return ScalarNode(value, file_index, line, column) elif type_value is dict: return MappingNode(value, file_index, line, column) elif type_value is list: return SequenceNode(value, file_index, line, column) raise ValueError( "Node values can only be 'list', 'dict', 'bool', 'str', 'int' or None. Not {}".format(type_value)) cdef Node _create_node_recursive(object value): cdef value_type = type(value) if value_type is list: node = __new_node_from_list(value) elif value_type is str: node = ScalarNode(value, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) elif value_type is dict: node = new_node_from_dict(value) else: raise ValueError( "Unable to assign a value of type {} to a Node.".format(value_type)) return node # Loads a dictionary from some YAML # # Args: # filename (str): The YAML file to load # shortname (str): The filename in shorthand for error reporting (or None) # copy_tree (bool): Whether to make a copy, preserving the original toplevels # for later serialization # project (Project): The (optional) project to associate the parsed YAML with # # Returns (dict): A loaded copy of the YAML file with provenance information # # Raises: LoadError # cpdef Node load(str filename, str shortname=None, bint copy_tree=False, object project=None): if not shortname: shortname = filename cdef str displayname if (project is not None) and (project.junction is not None): displayname = "{}:{}".format(project.junction.name, shortname) else: displayname = shortname cdef Py_ssize_t file_number = len(_FILE_LIST) _FILE_LIST.append(FileInfo(filename, shortname, displayname, None, project)) cdef Node data try: with open(filename) as f: contents = f.read() data = load_data(contents, file_index=file_number, file_name=filename, copy_tree=copy_tree) return data except FileNotFoundError as e: raise LoadError(LoadErrorReason.MISSING_FILE, "Could not find file at {}".format(filename)) from e except IsADirectoryError as e: raise LoadError(LoadErrorReason.LOADING_DIRECTORY, "{} is a directory. bst command expects a .bst file." .format(filename)) from e except LoadError as e: raise LoadError(e.reason, "{}: {}".format(displayname, e)) from e # Like load(), but doesnt require the data to be in a file # cpdef Node load_data(str data, int file_index=_SYNTHETIC_FILE_INDEX, str file_name=None, bint copy_tree=False): cdef Representer rep cdef FileInfo f_info try: rep = Representer(file_index) parser = yaml.CParser(data) try: while parser.check_event(): rep.handle_event(parser.get_event()) finally: parser.dispose() contents = rep.get_output() except YAMLLoadError as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Malformed YAML:\n\n{}\n\n".format(e)) from e except Exception as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Severely malformed YAML:\n\n{}\n\n".format(e)) from e if type(contents) != MappingNode: # Special case allowance for None, when the loaded file has only comments in it. if contents is None: contents = MappingNode({}, file_index, 0, 0) else: raise LoadError(LoadErrorReason.INVALID_YAML, "YAML file has content of type '{}' instead of expected type 'dict': {}" .format(type(contents[0]).__name__, file_name)) # Store this away because we'll use it later for "top level" provenance if file_index != _SYNTHETIC_FILE_INDEX: f_info = _FILE_LIST[file_index] _FILE_LIST[file_index] = FileInfo( f_info.filename, f_info.shortname, f_info.displayname, contents, f_info.project, ) if copy_tree: contents = contents.copy() return contents # dump() # # Write a YAML node structure out to disk. # # This will always call `node_sanitize` on its input, so if you wanted # to output something close to what you read in, consider using the # `roundtrip_load` and `roundtrip_dump` function pair instead. # # Args: # contents (any): Content to write out # filename (str): The (optional) file name to write out to def dump(object contents, str filename=None): roundtrip_dump(node_sanitize(contents), file=filename) # node_get_provenance() # # Gets the provenance for a node # # Args: # node (Node): a dictionary # key (str): key in the dictionary # indices (list of indexes): Index path, in the case of list values # # Returns: The Provenance of the dict, member or list element # cpdef ProvenanceInformation node_get_provenance(Node node, str key=None, list indices=None): if key is None: # Retrieving the provenance for this node directly return ProvenanceInformation(node) if key and not indices: return ProvenanceInformation(node.value.get(key)) cdef Node nodeish = node.value.get(key) for idx in indices: nodeish = nodeish.value[idx] return ProvenanceInformation(nodeish) # node_extend_list() # # Extend a list inside a node to a given length, using the passed # default value to fill it out. # # Valid default values are: # Any string # An empty dict # An empty list # # Args: # node (node): The node # key (str): The list name in the node # length (int): The length to extend the list to # default (any): The default value to extend with. def node_extend_list(Node node, str key, Py_ssize_t length, object default): assert type(default) is str or default in ([], {}) cdef Node list_node = node.value.get(key) if list_node is None: list_node = node.value[key] = SequenceNode([], node.file_index, node.line, next_synthetic_counter()) cdef list the_list = list_node.value def_type = type(default) file_index = node.file_index if the_list: line_num = the_list[-1][2] else: line_num = list_node.line while length > len(the_list): if def_type is str: value = default elif def_type is list: value = [] else: value = {} line_num += 1 the_list.append(_create_node(value, file_index, line_num, next_synthetic_counter())) # is_node() # # A test method which returns whether or not the passed in value # is a valid YAML node. It is not valid to call this on a Node # object which is not a Mapping. # # Args: # maybenode (any): The object to test for nodeness # # Returns: # (bool): Whether or not maybenode was a Node # def is_node(maybenode): # It's a programming error to give this a Node which isn't a mapping # so assert that. assert (type(maybenode) not in [ScalarNode, SequenceNode]) # Now return the type check return type(maybenode) is MappingNode # new_synthetic_file() # # Create a new synthetic mapping node, with an associated file entry # (in _FILE_LIST) such that later tracking can correctly determine which # file needs writing to in order to persist the changes. # # Args: # filename (str): The name of the synthetic file to create # project (Project): The optional project to associate this synthetic file with # # Returns: # (Node): An empty YAML mapping node, whose provenance is to this new # synthetic file # def new_synthetic_file(str filename, object project=None): cdef Py_ssize_t file_index = len(_FILE_LIST) cdef Node node = MappingNode({}, file_index, 0, 0) _FILE_LIST.append(FileInfo(filename, filename, "".format(filename), node, project)) return node # new_empty_node() # # Args: # ref_node (Node): Optional node whose provenance should be referenced # # Returns # (Node): A new empty YAML mapping node # def new_empty_node(Node ref_node=None): if ref_node is not None: return MappingNode({}, ref_node.file_index, ref_node.line, next_synthetic_counter()) else: return MappingNode({}, _SYNTHETIC_FILE_INDEX, 0, 0) # FIXME: we should never need that def new_empty_list_node(): return SequenceNode([], _SYNTHETIC_FILE_INDEX, 0, 0) # new_node_from_dict() # # Args: # indict (dict): The input dictionary # # Returns: # (Node): A new synthetic YAML tree which represents this dictionary # cpdef Node new_node_from_dict(dict indict): cdef dict ret = {} cdef str k for k, v in indict.items(): vtype = type(v) if vtype is dict: ret[k] = new_node_from_dict(v) elif vtype is list: ret[k] = __new_node_from_list(v) else: ret[k] = ScalarNode(str(v), _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) return MappingNode(ret, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) # Internal function to help new_node_from_dict() to handle lists cdef Node __new_node_from_list(list inlist): cdef list ret = [] for v in inlist: vtype = type(v) if vtype is dict: ret.append(new_node_from_dict(v)) elif vtype is list: ret.append(__new_node_from_list(v)) else: ret.append(ScalarNode(str(v), _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter())) return SequenceNode(ret, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) # _is_composite_list # # Checks if the given node is a Mapping with array composition # directives. # # Args: # node (value): Any node # # Returns: # (bool): True if node was a Mapping containing only # list composition directives # # Raises: # (LoadError): If node was a mapping and contained a mix of # list composition directives and other keys # cdef bint _is_composite_list(Node node): cdef bint has_directives = False cdef bint has_keys = False cdef str key if type(node) is MappingNode: for key in ( node).keys(): if key in ['(>)', '(<)', '(=)']: # pylint: disable=simplifiable-if-statement has_directives = True else: has_keys = True if has_keys and has_directives: provenance = node_get_provenance(node) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary contains array composition directives and arbitrary keys" .format(provenance)) return has_directives return False # _compose_composite_list() # # Composes a composite list (i.e. a dict with list composition directives) # on top of a target list which is a composite list itself. # # Args: # target (Node): A composite list # source (Node): A composite list # cdef void _compose_composite_list(Node target, Node source): clobber = source.value.get("(=)") prefix = source.value.get("(<)") suffix = source.value.get("(>)") if clobber is not None: # We want to clobber the target list # which basically means replacing the target list # with ourselves target.value["(=)"] = clobber if prefix is not None: target.value["(<)"] = prefix elif "(<)" in target.value: target.value["(<)"].value.clear() if suffix is not None: target.value["(>)"] = suffix elif "(>)" in target.value: target.value["(>)"].value.clear() else: # Not clobbering, so prefix the prefix and suffix the suffix if prefix is not None: if "(<)" in target.value: for v in reversed(prefix.value): target.value["(<)"].value.insert(0, v) else: target.value["(<)"] = prefix if suffix is not None: if "(>)" in target.value: target.value["(>)"].value.extend(suffix.value) else: target.value["(>)"] = suffix # _compose_list() # # Compose a composite list (a dict with composition directives) on top of a # simple list. # # Args: # target (Node): The target list to be composed into # source (Node): The composition list to be composed from # cdef void _compose_list(Node target, Node source): clobber = source.value.get("(=)") prefix = source.value.get("(<)") suffix = source.value.get("(>)") if clobber is not None: target.value.clear() target.value.extend(clobber.value) if prefix is not None: for v in reversed(prefix.value): target.value.insert(0, v) if suffix is not None: target.value.extend(suffix.value) # composite_dict() # # Compose one mapping node onto another # # Args: # target (Node): The target to compose into # source (Node): The source to compose from # path (list): The path to the current composition node # # Raises: CompositeError # cpdef void composite_dict(Node target, Node source, list path=None) except *: cdef str k cdef Node v, target_value if path is None: path = [] for k, v in source.value.items(): path.append(k) if type(v.value) is list: # List clobbers anything list-like target_value = target.value.get(k) if not (target_value is None or type(target_value.value) is list or _is_composite_list(target_value)): raise CompositeError(path, "{}: List cannot overwrite {} at: {}" .format(node_get_provenance(source, k), k, node_get_provenance(target, k))) # Looks good, clobber it target.value[k] = v elif _is_composite_list(v): if k not in target.value: # Composite list clobbers empty space target.value[k] = v elif type(target.value[k].value) is list: # Composite list composes into a list _compose_list(target.value[k], v) elif _is_composite_list(target.value[k]): # Composite list merges into composite list _compose_composite_list(target.value[k], v) else: # Else composing on top of normal dict or a scalar, so raise... raise CompositeError(path, "{}: Cannot compose lists onto {}".format( node_get_provenance(v), node_get_provenance(target.value[k]))) elif type(v.value) is dict: # We're composing a dict into target now if k not in target.value: # Target lacks a dict at that point, make a fresh one with # the same provenance as the incoming dict target.value[k] = MappingNode({}, v.file_index, v.line, v.column) if type(target.value) is not dict: raise CompositeError(path, "{}: Cannot compose dictionary onto {}".format( node_get_provenance(v), node_get_provenance(target.value[k]))) composite_dict(target.value[k], v, path) else: target_value = target.value.get(k) if target_value is not None and type(target_value.value) is not str: raise CompositeError(path, "{}: Cannot compose scalar on non-scalar at {}".format( node_get_provenance(v), node_get_provenance(target.value[k]))) target.value[k] = v path.pop() # Like composite_dict(), but raises an all purpose LoadError for convenience # cpdef void composite(MappingNode target, MappingNode source) except *: assert type(source.value) is dict assert type(target.value) is dict try: composite_dict(target, source) except CompositeError as e: source_provenance = node_get_provenance(source) error_prefix = "" if source_provenance: error_prefix = "{}: ".format(source_provenance) raise LoadError(LoadErrorReason.ILLEGAL_COMPOSITE, "{}Failure composing {}: {}" .format(error_prefix, e.path, e.message)) from e # Like composite(target, source), but where target overrides source instead. # def composite_and_move(MappingNode target, MappingNode source): composite(source, target) cdef str key cdef Node value cdef list to_delete = [key for key in target.value.keys() if key not in source.value] for key, value in source.value.items(): target.value[key] = value for key in to_delete: del target.value[key] # Types we can short-circuit in node_sanitize for speed. __SANITIZE_SHORT_CIRCUIT_TYPES = (int, float, str, bool) # node_sanitize() # # Returns an alphabetically ordered recursive copy # of the source node with internal provenance information stripped. # # Only dicts are ordered, list elements are left in order. # cpdef object node_sanitize(object node, object dict_type=OrderedDict): node_type = type(node) # If we have an unwrappable node, unwrap it # FIXME: we should only ever have Nodes here if node_type in [MappingNode, SequenceNode]: node = node.value node_type = type(node) if node_type is ScalarNode: return node.value # Short-circuit None which occurs ca. twice per element if node is None: return node # Next short-circuit integers, floats, strings, booleans, and tuples if node_type in __SANITIZE_SHORT_CIRCUIT_TYPES: return node # Now short-circuit lists. elif node_type is list: return [node_sanitize(elt, dict_type=dict_type) for elt in node] # Finally dict, and other Mappings need special handling elif node_type is dict: result = dict_type() key_list = [key for key, _ in node.items()] for key in sorted(key_list): result[key] = node_sanitize(node[key], dict_type=dict_type) return result # Sometimes we're handed tuples and we can't be sure what they contain # so we have to sanitize into them elif node_type is tuple: return tuple([node_sanitize(v, dict_type=dict_type) for v in node]) # Everything else just gets returned as-is. return node # node_validate() # # Validate the node so as to ensure the user has not specified # any keys which are unrecognized by buildstream (usually this # means a typo which would otherwise not trigger an error). # # Args: # node (Node): A dictionary loaded from YAML # valid_keys (list): A list of valid keys for the specified node # # Raises: # LoadError: In the case that the specified node contained # one or more invalid keys # cpdef void node_validate(Node node, list valid_keys) except *: # Probably the fastest way to do this: https://stackoverflow.com/a/23062482 cdef set valid_keys_set = set(valid_keys) cdef str key for key in node.value: if key not in valid_keys_set: provenance = node_get_provenance(node, key=key) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Unexpected key: {}".format(provenance, key)) # These are the directives used to compose lists, we need this because it's # slightly faster during the node_final_assertions checks __NODE_ASSERT_COMPOSITION_DIRECTIVES = ('(>)', '(<)', '(=)') # node_final_assertions() # # This must be called on a fully loaded and composited node, # after all composition has completed. # # Args: # node (Mapping): The final composited node # # Raises: # (LoadError): If any assertions fail # cpdef void node_final_assertions(MappingNode node) except *: cdef str key cdef Node value for key, value in node.value.items(): # Assert that list composition directives dont remain, this # indicates that the user intended to override a list which # never existed in the underlying data # if key in __NODE_ASSERT_COMPOSITION_DIRECTIVES: provenance = node_get_provenance(node, key) raise LoadError(LoadErrorReason.TRAILING_LIST_DIRECTIVE, "{}: Attempt to override non-existing list".format(provenance)) value_type = type(value.value) if value_type is dict: node_final_assertions(value) elif value_type is list: _list_final_assertions(value) # Helper function for node_final_assertions(), but for lists. def _list_final_assertions(Node values): for value in values.value: value_type = type(value.value) if value_type is dict: node_final_assertions(value) elif value_type is list: _list_final_assertions(value) # assert_symbol_name() # # A helper function to check if a loaded string is a valid symbol # name and to raise a consistent LoadError if not. For strings which # are required to be symbols. # # Args: # provenance (Provenance): The provenance of the loaded symbol, or None # symbol_name (str): The loaded symbol name # purpose (str): The purpose of the string, for an error message # allow_dashes (bool): Whether dashes are allowed for this symbol # # Raises: # LoadError: If the symbol_name is invalid # # Note that dashes are generally preferred for variable names and # usage in YAML, but things such as option names which will be # evaluated with jinja2 cannot use dashes. def assert_symbol_name(ProvenanceInformation provenance, str symbol_name, str purpose, *, bint allow_dashes=True): cdef str valid_chars = string.digits + string.ascii_letters + '_' if allow_dashes: valid_chars += '-' cdef bint valid = True if not symbol_name: valid = False elif any(x not in valid_chars for x in symbol_name): valid = False elif symbol_name[0] in string.digits: valid = False if not valid: detail = "Symbol names must contain only alphanumeric characters, " + \ "may not start with a digit, and may contain underscores" if allow_dashes: detail += " or dashes" message = "Invalid symbol name for {}: '{}'".format(purpose, symbol_name) if provenance is not None: message = "{}: {}".format(provenance, message) raise LoadError(LoadErrorReason.INVALID_SYMBOL_NAME, message, detail=detail) # node_find_target() # # Searches the given node tree for the given target node. # # This is typically used when trying to walk a path to a given node # for the purpose of then modifying a similar tree of objects elsewhere # # If the key is provided, then we actually hunt for the node represented by # target[key] and return its container, rather than hunting for target directly # # Args: # node (Node): The node at the root of the tree to search # target (Node): The node you are looking for in that tree # key (str): Optional string key within target node # # Returns: # (list): A path from `node` to `target` or None if `target` is not in the subtree cpdef list node_find_target(MappingNode node, Node target, str key=None): if key is not None: target = target.value[key] cdef list path = [] if _walk_find_target(node, path, target): if key: # Remove key from end of path path = path[:-1] return path return None # Helper for node_find_target() which walks a value cdef bint _walk_find_target(Node node, list path, Node target) except *: if node.file_index == target.file_index and node.line == target.line and node.column == target.column: return True elif type(node.value) is dict: return _walk_dict_node(node, path, target) elif type(node.value) is list: return _walk_list_node(node, path, target) return False # Helper for node_find_target() which walks a list cdef bint _walk_list_node(Node node, list path, Node target): cdef int i cdef Node v for i, v in enumerate(node.value): path.append(i) if _walk_find_target(v, path, target): return True del path[-1] return False # Helper for node_find_target() which walks a mapping cdef bint _walk_dict_node(MappingNode node, list path, Node target): cdef str k cdef Node v for k, v in node.value.items(): path.append(k) if _walk_find_target(v, path, target): return True del path[-1] return False ############################################################################### # Roundtrip code # Always represent things consistently: yaml.RoundTripRepresenter.add_representer(OrderedDict, yaml.SafeRepresenter.represent_dict) # Always parse things consistently yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:int', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:float', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:bool', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:null', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:timestamp', yaml.RoundTripConstructor.construct_yaml_str) # HardlineDumper # # This is a dumper used during roundtrip_dump which forces every scalar to be # a plain string, in order to match the output format to the input format. # # If you discover something is broken, please add a test case to the roundtrip # test in tests/internals/yaml/roundtrip-test.yaml # class HardlineDumper(yaml.RoundTripDumper): def __init__(self, *args, **kwargs): yaml.RoundTripDumper.__init__(self, *args, **kwargs) # For each of YAML 1.1 and 1.2, force everything to be a plain string for version in [(1, 1), (1, 2), None]: self.add_version_implicit_resolver( version, u'tag:yaml.org,2002:str', yaml.util.RegExp(r'.*'), None) # roundtrip_load() # # Load a YAML file into memory in a form which allows roundtripping as best # as ruamel permits. # # Note, the returned objects can be treated as Mappings and Lists and Strings # but replacing content wholesale with plain dicts and lists may result # in a loss of comments and formatting. # # Args: # filename (str): The file to load in # allow_missing (bool): Optionally set this to True to allow missing files # # Returns: # (Mapping): The loaded YAML mapping. # # Raises: # (LoadError): If the file is missing, or a directory, this is raised. # Also if the YAML is malformed. # def roundtrip_load(filename, *, allow_missing=False): try: with open(filename, "r") as fh: data = fh.read() contents = roundtrip_load_data(data, filename=filename) except FileNotFoundError as e: if allow_missing: # Missing files are always empty dictionaries return {} else: raise LoadError(LoadErrorReason.MISSING_FILE, "Could not find file at {}".format(filename)) from e except IsADirectoryError as e: raise LoadError(LoadErrorReason.LOADING_DIRECTORY, "{} is a directory." .format(filename)) from e return contents # roundtrip_load_data() # # Parse the given contents as YAML, returning them as a roundtrippable data # structure. # # A lack of content will be returned as an empty mapping. # # Args: # contents (str): The contents to be parsed as YAML # filename (str): Optional filename to be used in error reports # # Returns: # (Mapping): The loaded YAML mapping # # Raises: # (LoadError): Raised on invalid YAML, or YAML which parses to something other # than a Mapping # def roundtrip_load_data(contents, *, filename=None): try: contents = yaml.load(contents, yaml.RoundTripLoader, preserve_quotes=True) except (yaml.scanner.ScannerError, yaml.composer.ComposerError, yaml.parser.ParserError) as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Malformed YAML:\n\n{}\n\n{}\n".format(e.problem, e.problem_mark)) from e # Special case empty files at this point if contents is None: # We'll make them empty mappings like the main Node loader contents = {} if not isinstance(contents, Mapping): raise LoadError(LoadErrorReason.INVALID_YAML, "YAML file has content of type '{}' instead of expected type 'dict': {}" .format(type(contents).__name__, filename)) return contents # roundtrip_dump() # # Dumps the given contents as a YAML file. Ideally the contents came from # parsing with `roundtrip_load` or `roundtrip_load_data` so that they will be # dumped in the same form as they came from. # # If `file` is a string, it is the filename to write to, if `file` has a # `write` method, it's treated as a stream, otherwise output is to stdout. # # Args: # contents (Mapping or list): The content to write out as YAML. # file (any): The file to write to # def roundtrip_dump(contents, file=None): assert type(contents) is not Node def stringify_dict(thing): for k, v in thing.items(): if type(v) is str: pass elif isinstance(v, Mapping): stringify_dict(v) elif isinstance(v, Sequence): stringify_list(v) else: thing[k] = str(v) def stringify_list(thing): for i, v in enumerate(thing): if type(v) is str: pass elif isinstance(v, Mapping): stringify_dict(v) elif isinstance(v, Sequence): stringify_list(v) else: thing[i] = str(v) contents = deepcopy(contents) stringify_dict(contents) with ExitStack() as stack: if type(file) is str: from . import utils f = stack.enter_context(utils.save_file_atomic(file, 'w')) elif hasattr(file, 'write'): f = file else: f = sys.stdout yaml.round_trip_dump(contents, f, Dumper=HardlineDumper)