# # Copyright (C) 2018 Codethink Limited # Copyright (C) 2019 Bloomberg LLP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom # Daniel Silverstone # James Ennis # Benjamin Schubert import datetime import sys import string from contextlib import ExitStack from collections import OrderedDict from collections.abc import Mapping from ruamel import yaml from ._exceptions import LoadError, LoadErrorReason # Without this, pylint complains about all the `type(foo) is blah` checks # because it feels isinstance() is more idiomatic. Sadly, it is much slower to # do `isinstance(foo, blah)` for reasons I am unable to fathom. As such, we # blanket disable the check for this module. # # pylint: disable=unidiomatic-typecheck # A sentinel to be used as a default argument for functions that need # to distinguish between a kwarg set to None and an unset kwarg. _sentinel = object() # Node() # # Container for YAML loaded data and its provenance # # All nodes returned (and all internal lists/strings) have this type (rather # than a plain tuple, to distinguish them in things like node_sanitize) # # Members: # value (str/list/dict): The loaded value. # file_index (int): Index within _FILE_LIST (a list of loaded file paths). # Negative indices indicate synthetic nodes so that # they can be referenced. # line (int): The line number within the file where the value appears. # col (int): The column number within the file where the value appears. # cdef class Node: def __init__(self, object value, int file_index, int line, int column): self.value = value self.file_index = file_index self.line = line self.column = column @classmethod def from_dict(cls, dict value): if value: return _new_node_from_dict(value, Node(None, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter())) else: # We got an empty dict, we can shortcut return MappingNode({}, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) cdef bint _walk_find(self, Node target, list path) except *: raise NotImplementedError() cdef bint _shares_position_with(self, Node target): return self.file_index == target.file_index and self.line == target.line and self.column == target.column def __contains__(self, what): # Delegate to the inner value, though this will likely not work # very well if the node is a list or string, it's unlikely that # code which has access to such nodes would do this. return what in self.value cpdef Node copy(self): raise NotImplementedError() cpdef object strip_node_info(self): raise NotImplementedError() # _assert_fully_composited() # # This must be called on a fully loaded and composited node, # after all composition has completed. # # This checks that no more composition directives are present # in the data. # # Raises: # (LoadError): If any assertions fail # cpdef void _assert_fully_composited(self) except *: raise NotImplementedError() # _is_composite_list # # Checks if the node is a Mapping with array composition # directives. # # Returns: # (bool): True if node was a Mapping containing only # list composition directives # # Raises: # (LoadError): If node was a mapping and contained a mix of # list composition directives and other keys # cdef bint _is_composite_list(self) except *: raise NotImplementedError() cdef void _compose_on(self, str key, MappingNode target, list path) except *: raise NotImplementedError() def __json__(self): raise ValueError("Nodes should not be allowed when jsonify-ing data", self) cdef class ScalarNode(Node): def __init__(self, object value, int file_index, int line, int column): if type(value) is str: value = value.strip() self.value = value self.file_index = file_index self.line = line self.column = column cpdef ScalarNode copy(self): return self cpdef bint is_none(self): return self.value is None cpdef bint as_bool(self) except *: if type(self.value) is bool: return self.value # Don't coerce booleans to string, this makes "False" strings evaluate to True if self.value in ('True', 'true'): return True elif self.value in ('False', 'false'): return False else: provenance = node_get_provenance(self) path = provenance.toplevel._find(self)[-1] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, bool.__name__, self.value)) cpdef int as_int(self) except *: try: return int(self.value) except ValueError: provenance = node_get_provenance(self) path = provenance.toplevel._find(self)[-1] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, int.__name__)) cpdef str as_str(self): # We keep 'None' as 'None' to simplify the API's usage and allow chaining for users if self.value is None: return None return str(self.value) cpdef object strip_node_info(self): return self.value cpdef void _assert_fully_composited(self) except *: pass cdef void _compose_on(self, str key, MappingNode target, list path) except *: cdef Node target_value = target.value.get(key) if target_value is not None and type(target_value) is not ScalarNode: raise CompositeError(path, "{}: Cannot compose scalar on non-scalar at {}".format( node_get_provenance(self), node_get_provenance(target_value))) target.value[key] = self cdef bint _is_composite_list(self) except *: return False cdef bint _walk_find(self, Node target, list path) except *: return self._shares_position_with(target) cdef class MappingNode(Node): def __init__(self, dict value, int file_index, int line, int column): self.value = value self.file_index = file_index self.line = line self.column = column cpdef MappingNode copy(self): cdef dict copy = {} cdef str key cdef Node value for key, value in self.value.items(): copy[key] = value.copy() return MappingNode(copy, self.file_index, self.line, self.column) # find() # # Searches the given node tree for the given target node. # # This is typically used when trying to walk a path to a given node # for the purpose of then modifying a similar tree of objects elsewhere # # Args: # target (Node): The node you are looking for in that tree # # Returns: # (list): A path from `node` to `target` or None if `target` is not in the subtree cpdef list _find(self, Node target): cdef list path = [] if self._walk_find(target, path): return path return None # composite() # # Compose one mapping node onto another # # Args: # target (Node): The target to compose into # # Raises: LoadError # cpdef void composite(self, MappingNode target) except *: try: self._composite(target, []) except CompositeError as e: source_provenance = node_get_provenance(self) error_prefix = "" if source_provenance: error_prefix = "{}: ".format(source_provenance) raise LoadError(LoadErrorReason.ILLEGAL_COMPOSITE, "{}Failure composing {}: {}" .format(error_prefix, e.path, e.message)) from e # Like composite(target, source), but where target overrides source instead. # cpdef void composite_under(self, MappingNode target) except *: target.composite(self) cdef str key cdef Node value cdef list to_delete = [key for key in target.value.keys() if key not in self.value] for key, value in self.value.items(): target.value[key] = value for key in to_delete: del target.value[key] cdef Node get(self, str key, object default, object default_constructor): value = self.value.get(key, _sentinel) if value is _sentinel: if default is _sentinel: provenance = node_get_provenance(self) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary did not contain expected key '{}'".format(provenance, key)) if default is None: value = None else: value = default_constructor(default, _SYNTHETIC_FILE_INDEX, 0, next_synthetic_counter()) return value cpdef MappingNode get_mapping(self, str key, object default=_sentinel): value = self.get(key, default, MappingNode) if type(value) is not MappingNode and value is not None: provenance = node_get_provenance(value) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type 'Mapping'" .format(provenance, key)) return value cpdef Node get_node(self, str key, list allowed_types = None, bint allow_none = False): cdef value = self.value.get(key, _sentinel) if value is _sentinel: if allow_none: return None provenance = node_get_provenance(self) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary did not contain expected key '{}'".format(provenance, key)) if allowed_types and type(value) not in allowed_types: provenance = node_get_provenance(self) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not one of the following: {}.".format( provenance, key, ", ".join(allowed_types))) return value cpdef ScalarNode get_scalar(self, str key, object default=_sentinel): value = self.get(key, default, ScalarNode) if type(value) is not ScalarNode: if value is None: value = ScalarNode(None, self.file_index, 0, next_synthetic_counter()) else: provenance = node_get_provenance(value) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type 'Scalar'" .format(provenance, key)) return value cpdef SequenceNode get_sequence(self, str key, object default=_sentinel): value = self.get(key, default, SequenceNode) if type(value) is not SequenceNode and value is not None: provenance = node_get_provenance(value) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type 'Sequence'" .format(provenance, key)) return value cpdef bint get_bool(self, str key, object default=_sentinel) except *: cdef ScalarNode scalar = self.get_scalar(key, default) return scalar.as_bool() cpdef int get_int(self, str key, object default=_sentinel) except *: cdef ScalarNode scalar = self.get_scalar(key, default) return scalar.as_int() cpdef str get_str(self, str key, object default=_sentinel): cdef ScalarNode scalar = self.get_scalar(key, default) return scalar.as_str() cpdef object items(self): return self.value.items() cpdef list keys(self): return list(self.value.keys()) cpdef void safe_del(self, str key): try: del self.value[key] except KeyError: pass # validate_keys() # # Validate the node so as to ensure the user has not specified # any keys which are unrecognized by buildstream (usually this # means a typo which would otherwise not trigger an error). # # Args: # valid_keys (list): A list of valid keys for the specified node # # Raises: # LoadError: In the case that the specified node contained # one or more invalid keys # cpdef void validate_keys(self, list valid_keys) except *: # Probably the fastest way to do this: https://stackoverflow.com/a/23062482 cdef set valid_keys_set = set(valid_keys) cdef str key for key in self.value: if key not in valid_keys_set: provenance = node_get_provenance(self, key=key) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Unexpected key: {}".format(provenance, key)) cpdef object values(self): return self.value.values() cpdef object strip_node_info(self): cdef str key cdef Node value return {key: value.strip_node_info() for key, value in self.value.items()} cdef void _composite(self, MappingNode target, list path=None) except *: cdef str key cdef Node value for key, value in self.value.items(): path.append(key) value._compose_on(key, target, path) path.pop() cdef void _compose_on(self, str key, MappingNode target, list path) except *: cdef Node target_value if self._is_composite_list(): if key not in target.value: # Composite list clobbers empty space target.value[key] = self else: target_value = target.value[key] if type(target_value) is SequenceNode: # Composite list composes into a list self._compose_on_list(target_value) elif target_value._is_composite_list(): # Composite list merges into composite list self._compose_on_composite_dict(target_value) else: # Else composing on top of normal dict or a scalar, so raise... raise CompositeError(path, "{}: Cannot compose lists onto {}".format( node_get_provenance(self), node_get_provenance(target_value))) else: # We're composing a dict into target now if key not in target.value: # Target lacks a dict at that point, make a fresh one with # the same provenance as the incoming dict target.value[key] = MappingNode({}, self.file_index, self.line, self.column) self._composite(target.value[key], path) cdef void _compose_on_list(self, SequenceNode target): cdef SequenceNode clobber = self.value.get("(=)") cdef SequenceNode prefix = self.value.get("(<)") cdef SequenceNode suffix = self.value.get("(>)") if clobber is not None: target.value.clear() target.value.extend(clobber.value) if prefix is not None: for v in reversed(prefix.value): target.value.insert(0, v) if suffix is not None: target.value.extend(suffix.value) cdef void _compose_on_composite_dict(self, MappingNode target): cdef SequenceNode clobber = self.value.get("(=)") cdef SequenceNode prefix = self.value.get("(<)") cdef SequenceNode suffix = self.value.get("(>)") if clobber is not None: # We want to clobber the target list # which basically means replacing the target list # with ourselves target.value["(=)"] = clobber if prefix is not None: target.value["(<)"] = prefix elif "(<)" in target.value: target.value["(<)"].value.clear() if suffix is not None: target.value["(>)"] = suffix elif "(>)" in target.value: target.value["(>)"].value.clear() else: # Not clobbering, so prefix the prefix and suffix the suffix if prefix is not None: if "(<)" in target.value: for v in reversed(prefix.value): target.value["(<)"].value.insert(0, v) else: target.value["(<)"] = prefix if suffix is not None: if "(>)" in target.value: target.value["(>)"].value.extend(suffix.value) else: target.value["(>)"] = suffix cdef bint _is_composite_list(self) except *: cdef bint has_directives = False cdef bint has_keys = False cdef str key for key in self.value.keys(): if key in ['(>)', '(<)', '(=)']: has_directives = True else: has_keys = True if has_keys and has_directives: provenance = node_get_provenance(self) raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Dictionary contains array composition directives and arbitrary keys" .format(provenance)) return has_directives def __delitem__(self, str key): del self.value[key] def __setitem__(self, str key, object value): if type(value) in [MappingNode, ScalarNode, SequenceNode]: self.value[key] = value else: node = _create_node_recursive(value, self) # FIXME: Do we really want to override provenance? # # Related to https://gitlab.com/BuildStream/buildstream/issues/1058 # # There are only two cases were nodes are set in the code (hence without provenance): # - When automatic variables are set by the core (e-g: max-jobs) # - when plugins call Element.set_public_data # # The first case should never throw errors, so it is of limited interests. # # The second is more important. What should probably be done here is to have 'set_public_data' # able of creating a fake provenance with the name of the plugin, the project and probably the # element name. # # We would therefore have much better error messages, and would be able to get rid of most synthetic # nodes. old_value = self.value.get(key) if old_value: node.file_index = old_value.file_index node.line = old_value.line node.column = old_value.column self.value[key] = node cpdef void _assert_fully_composited(self) except *: cdef str key cdef Node value for key, value in self.value.items(): # Assert that list composition directives dont remain, this # indicates that the user intended to override a list which # never existed in the underlying data # if key in ('(>)', '(<)', '(=)'): provenance = node_get_provenance(value) raise LoadError(LoadErrorReason.TRAILING_LIST_DIRECTIVE, "{}: Attempt to override non-existing list".format(provenance)) value._assert_fully_composited() cdef bint _walk_find(self, Node target, list path) except *: cdef str k cdef Node v if self._shares_position_with(target): return True for k, v in self.value.items(): path.append(k) if v._walk_find(target, path): return True del path[-1] return False cdef class SequenceNode(Node): def __init__(self, list value, int file_index, int line, int column): self.value = value self.file_index = file_index self.line = line self.column = column cpdef void append(self, object value): if type(value) in [MappingNode, ScalarNode, SequenceNode]: self.value.append(value) else: node = _create_node_recursive(value, self) self.value.append(node) cpdef SequenceNode copy(self): cdef list copy = [] cdef Node entry for entry in self.value: copy.append(entry.copy()) return SequenceNode(copy, self.file_index, self.line, self.column) cpdef MappingNode mapping_at(self, int index): value = self.value[index] if type(value) is not MappingNode: provenance = node_get_provenance(self) path = ["[{}]".format(p) for p in provenance.toplevel._find(self)] + ["[{}]".format(index)] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, MappingNode.__name__)) return value cpdef SequenceNode sequence_at(self, int index): value = self.value[index] if type(value) is not SequenceNode: provenance = node_get_provenance(self) path = ["[{}]".format(p) for p in provenance.toplevel._find(self)] + ["[{}]".format(index)] raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Value of '{}' is not of the expected type '{}'" .format(provenance, path, SequenceNode.__name__)) return value cpdef list as_str_list(self): return [node.as_str() for node in self.value] cpdef object strip_node_info(self): cdef Node value return [value.strip_node_info() for value in self.value] cpdef void _assert_fully_composited(self) except *: cdef Node value for value in self.value: value._assert_fully_composited() cdef void _compose_on(self, str key, MappingNode target, list path) except *: # List clobbers anything list-like cdef Node target_value = target.value.get(key) if not (target_value is None or type(target_value) is SequenceNode or target_value._is_composite_list()): raise CompositeError(path, "{}: List cannot overwrite {} at: {}" .format(node_get_provenance(self), key, node_get_provenance(target_value))) # Looks good, clobber it target.value[key] = self cdef bint _is_composite_list(self) except *: return False cdef bint _walk_find(self, Node target, list path) except *: cdef int i cdef Node v if self._shares_position_with(target): return True for i, v in enumerate(self.value): path.append(i) if v._walk_find(target, path): return True del path[-1] return False def __iter__(self): return iter(self.value) def __len__(self): return len(self.value) def __reversed__(self): return reversed(self.value) def __setitem__(self, int key, object value): if type(value) in [MappingNode, ScalarNode, SequenceNode]: self.value[key] = value else: node = _create_node_recursive(value, self) # FIXME: Do we really want to override provenance? # See __setitem__ on 'MappingNode' for more context old_value = self.value[key] if old_value: node.file_index = old_value.file_index node.line = old_value.line node.column = old_value.column self.value[key] = node # Metadata container for a yaml toplevel node. # # This class contains metadata around a yaml node in order to be able # to trace back the provenance of a node to the file. # cdef class FileInfo: cdef str filename, shortname, displayname cdef Node toplevel, cdef object project def __init__(self, str filename, str shortname, str displayname, Node toplevel, object project): self.filename = filename self.shortname = shortname self.displayname = displayname self.toplevel = toplevel self.project = project # File name handling cdef _FILE_LIST = [] # Purely synthetic node will have _SYNTHETIC_FILE_INDEX for the file number, have line number # zero, and a negative column number which comes from inverting the next value # out of this counter. Synthetic nodes created with a reference node will # have a file number from the reference node, some unknown line number, and # a negative column number from this counter. cdef int _SYNTHETIC_FILE_INDEX = -1 cdef int __counter = 0 cdef int next_synthetic_counter(): global __counter __counter -= 1 return __counter # Returned from node_get_provenance cdef class ProvenanceInformation: def __init__(self, Node nodeish): cdef FileInfo fileinfo self.node = nodeish if (nodeish is None) or (nodeish.file_index == _SYNTHETIC_FILE_INDEX): self.filename = "" self.shortname = "" self.displayname = "" self.line = 1 self.col = 0 self.toplevel = None self.project = None else: fileinfo = _FILE_LIST[nodeish.file_index] self.filename = fileinfo.filename self.shortname = fileinfo.shortname self.displayname = fileinfo.displayname # We add 1 here to convert from computerish to humanish self.line = nodeish.line + 1 self.col = nodeish.column self.toplevel = fileinfo.toplevel self.project = fileinfo.project self.is_synthetic = (self.filename == '') or (self.col < 0) # Convert a Provenance to a string for error reporting def __str__(self): if self.is_synthetic: return "{} [synthetic node]".format(self.displayname) else: return "{} [line {:d} column {:d}]".format(self.displayname, self.line, self.col) # These exceptions are intended to be caught entirely within # the BuildStream framework, hence they do not reside in the # public exceptions.py class CompositeError(Exception): def __init__(self, path, message): super().__init__(message) self.path = path self.message = message class YAMLLoadError(Exception): pass # Represents the various states in which the Representer can be # while parsing yaml. cdef enum RepresenterState: doc init stream wait_key wait_list_item wait_value ctypedef RepresenterState (*representer_action)(Representer, object) # Representer for YAML events comprising input to the BuildStream format. # # All streams MUST represent a single document which must be a Mapping. # Anything else is considered an error. # # Mappings must only have string keys, values are always represented as # strings if they are scalar, or else as simple dictionaries and lists. # cdef class Representer: cdef int _file_index cdef RepresenterState state cdef list output, keys # Initialise a new representer # # The file index is used to store into the Node instances so that the # provenance of the YAML can be tracked. # # Args: # file_index (int): The index of this YAML file def __init__(self, int file_index): self._file_index = file_index self.state = RepresenterState.init self.output = [] self.keys = [] # Handle a YAML parse event # # Args: # event (YAML Event): The event to be handled # # Raises: # YAMLLoadError: Something went wrong. cdef void handle_event(self, event) except *: if getattr(event, "anchor", None) is not None: raise YAMLLoadError("Anchors are disallowed in BuildStream at line {} column {}" .format(event.start_mark.line, event.start_mark.column)) cdef str event_name = event.__class__.__name__ if event_name == "ScalarEvent": if event.tag is not None: if not event.tag.startswith("tag:yaml.org,2002:"): raise YAMLLoadError( "Non-core tag expressed in input. " + "This is disallowed in BuildStream. At line {} column {}" .format(event.start_mark.line, event.start_mark.column)) cdef representer_action handler = self._get_handler_for_event(event_name) if not handler: raise YAMLLoadError( "Invalid input detected. No handler for {} in state {} at line {} column {}" .format(event, self.state, event.start_mark.line, event.start_mark.column)) # Cython weirdness here, we need to pass self to the function self.state = handler(self, event) # pylint: disable=not-callable # Get the output of the YAML parse # # Returns: # (Node or None): Return the Node instance of the top level mapping or # None if there wasn't one. cdef Node get_output(self): if len(self.output): return self.output[0] return None cdef representer_action _get_handler_for_event(self, str event_name): if self.state == RepresenterState.wait_list_item: if event_name == "ScalarEvent": return self._handle_wait_list_item_ScalarEvent elif event_name == "MappingStartEvent": return self._handle_wait_list_item_MappingStartEvent elif event_name == "SequenceStartEvent": return self._handle_wait_list_item_SequenceStartEvent elif event_name == "SequenceEndEvent": return self._handle_wait_list_item_SequenceEndEvent elif self.state == RepresenterState.wait_value: if event_name == "ScalarEvent": return self._handle_wait_value_ScalarEvent elif event_name == "MappingStartEvent": return self._handle_wait_value_MappingStartEvent elif event_name == "SequenceStartEvent": return self._handle_wait_value_SequenceStartEvent elif self.state == RepresenterState.wait_key: if event_name == "ScalarEvent": return self._handle_wait_key_ScalarEvent elif event_name == "MappingEndEvent": return self._handle_wait_key_MappingEndEvent elif self.state == RepresenterState.stream: if event_name == "DocumentStartEvent": return self._handle_stream_DocumentStartEvent elif event_name == "StreamEndEvent": return self._handle_stream_StreamEndEvent elif self.state == RepresenterState.doc: if event_name == "MappingStartEvent": return self._handle_doc_MappingStartEvent elif event_name == "DocumentEndEvent": return self._handle_doc_DocumentEndEvent elif self.state == RepresenterState.init and event_name == "StreamStartEvent": return self._handle_init_StreamStartEvent return NULL cdef RepresenterState _handle_init_StreamStartEvent(self, object ev): return RepresenterState.stream cdef RepresenterState _handle_stream_DocumentStartEvent(self, object ev): return RepresenterState.doc cdef RepresenterState _handle_doc_MappingStartEvent(self, object ev): newmap = MappingNode({}, self._file_index, ev.start_mark.line, ev.start_mark.column) self.output.append(newmap) return RepresenterState.wait_key cdef RepresenterState _handle_wait_key_ScalarEvent(self, object ev): self.keys.append(ev.value) return RepresenterState.wait_value cdef RepresenterState _handle_wait_value_ScalarEvent(self, object ev): key = self.keys.pop() ( ( self.output[-1]).value)[key] = \ ScalarNode(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column) return RepresenterState.wait_key cdef RepresenterState _handle_wait_value_MappingStartEvent(self, object ev): cdef RepresenterState new_state = self._handle_doc_MappingStartEvent(ev) key = self.keys.pop() ( ( self.output[-2]).value)[key] = self.output[-1] return new_state cdef RepresenterState _handle_wait_key_MappingEndEvent(self, object ev): # We've finished a mapping, so pop it off the output stack # unless it's the last one in which case we leave it if len(self.output) > 1: self.output.pop() if type(( self.output[-1]).value) is list: return RepresenterState.wait_list_item else: return RepresenterState.wait_key else: return RepresenterState.doc cdef RepresenterState _handle_wait_value_SequenceStartEvent(self, object ev): self.output.append(SequenceNode([], self._file_index, ev.start_mark.line, ev.start_mark.column)) ( ( self.output[-2]).value)[self.keys[-1]] = self.output[-1] return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_SequenceStartEvent(self, object ev): self.keys.append(len(( self.output[-1]).value)) self.output.append(SequenceNode([], self._file_index, ev.start_mark.line, ev.start_mark.column)) ( ( self.output[-2]).value).append(self.output[-1]) return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_SequenceEndEvent(self, object ev): # When ending a sequence, we need to pop a key because we retain the # key until the end so that if we need to mutate the underlying entry # we can. key = self.keys.pop() self.output.pop() if type(key) is int: return RepresenterState.wait_list_item else: return RepresenterState.wait_key cdef RepresenterState _handle_wait_list_item_ScalarEvent(self, object ev): ( self.output[-1]).value.append( ScalarNode(ev.value, self._file_index, ev.start_mark.line, ev.start_mark.column)) return RepresenterState.wait_list_item cdef RepresenterState _handle_wait_list_item_MappingStartEvent(self, object ev): cdef RepresenterState new_state = self._handle_doc_MappingStartEvent(ev) ( ( self.output[-2]).value).append(self.output[-1]) return new_state cdef RepresenterState _handle_doc_DocumentEndEvent(self, object ev): if len(self.output) != 1: raise YAMLLoadError("Zero, or more than one document found in YAML stream") return RepresenterState.stream cdef RepresenterState _handle_stream_StreamEndEvent(self, object ev): return RepresenterState.init cdef Node _create_node_recursive(object value, Node ref_node): cdef value_type = type(value) if value_type is list: node = _new_node_from_list(value, ref_node) elif value_type is str: node = ScalarNode(value, ref_node.file_index, ref_node.line, next_synthetic_counter()) elif value_type is dict: node = _new_node_from_dict(value, ref_node) else: raise ValueError( "Unable to assign a value of type {} to a Node.".format(value_type)) return node # Loads a dictionary from some YAML # # Args: # filename (str): The YAML file to load # shortname (str): The filename in shorthand for error reporting (or None) # copy_tree (bool): Whether to make a copy, preserving the original toplevels # for later serialization # project (Project): The (optional) project to associate the parsed YAML with # # Returns (dict): A loaded copy of the YAML file with provenance information # # Raises: LoadError # cpdef Node load(str filename, str shortname=None, bint copy_tree=False, object project=None): if not shortname: shortname = filename cdef str displayname if (project is not None) and (project.junction is not None): displayname = "{}:{}".format(project.junction.name, shortname) else: displayname = shortname cdef Py_ssize_t file_number = len(_FILE_LIST) _FILE_LIST.append(FileInfo(filename, shortname, displayname, None, project)) cdef Node data try: with open(filename) as f: contents = f.read() data = load_data(contents, file_index=file_number, file_name=filename, copy_tree=copy_tree) return data except FileNotFoundError as e: raise LoadError(LoadErrorReason.MISSING_FILE, "Could not find file at {}".format(filename)) from e except IsADirectoryError as e: raise LoadError(LoadErrorReason.LOADING_DIRECTORY, "{} is a directory. bst command expects a .bst file." .format(filename)) from e except LoadError as e: raise LoadError(e.reason, "{}: {}".format(displayname, e)) from e # Like load(), but doesnt require the data to be in a file # cpdef Node load_data(str data, int file_index=_SYNTHETIC_FILE_INDEX, str file_name=None, bint copy_tree=False): cdef Representer rep cdef FileInfo f_info try: rep = Representer(file_index) parser = yaml.CParser(data) try: while parser.check_event(): rep.handle_event(parser.get_event()) finally: parser.dispose() contents = rep.get_output() except YAMLLoadError as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Malformed YAML:\n\n{}\n\n".format(e)) from e except Exception as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Severely malformed YAML:\n\n{}\n\n".format(e)) from e if type(contents) != MappingNode: # Special case allowance for None, when the loaded file has only comments in it. if contents is None: contents = MappingNode({}, file_index, 0, 0) else: raise LoadError(LoadErrorReason.INVALID_YAML, "YAML file has content of type '{}' instead of expected type 'dict': {}" .format(type(contents[0]).__name__, file_name)) # Store this away because we'll use it later for "top level" provenance if file_index != _SYNTHETIC_FILE_INDEX: f_info = _FILE_LIST[file_index] _FILE_LIST[file_index] = FileInfo( f_info.filename, f_info.shortname, f_info.displayname, contents, f_info.project, ) if copy_tree: contents = contents.copy() return contents # node_get_provenance() # # Gets the provenance for a node # # Args: # node (Node): a dictionary # key (str): key in the dictionary # indices (list of indexes): Index path, in the case of list values # # Returns: The Provenance of the dict, member or list element # cpdef ProvenanceInformation node_get_provenance(Node node, str key=None, list indices=None): if key is None: # Retrieving the provenance for this node directly return ProvenanceInformation(node) if key and not indices: return ProvenanceInformation(node.value.get(key)) cdef Node nodeish = node.value.get(key) for idx in indices: nodeish = nodeish.value[idx] return ProvenanceInformation(nodeish) # is_node() # # A test method which returns whether or not the passed in value # is a valid YAML node. It is not valid to call this on a Node # object which is not a Mapping. # # Args: # maybenode (any): The object to test for nodeness # # Returns: # (bool): Whether or not maybenode was a Node # def is_node(maybenode): # It's a programming error to give this a Node which isn't a mapping # so assert that. assert (type(maybenode) not in [ScalarNode, SequenceNode]) # Now return the type check return type(maybenode) is MappingNode # new_synthetic_file() # # Create a new synthetic mapping node, with an associated file entry # (in _FILE_LIST) such that later tracking can correctly determine which # file needs writing to in order to persist the changes. # # Args: # filename (str): The name of the synthetic file to create # project (Project): The optional project to associate this synthetic file with # # Returns: # (Node): An empty YAML mapping node, whose provenance is to this new # synthetic file # def new_synthetic_file(str filename, object project=None): cdef Py_ssize_t file_index = len(_FILE_LIST) cdef Node node = MappingNode({}, file_index, 0, 0) _FILE_LIST.append(FileInfo(filename, filename, "".format(filename), node, project)) return node # new_node_from_dict() # # Args: # indict (dict): The input dictionary # # Returns: # (Node): A new synthetic YAML tree which represents this dictionary # cdef Node _new_node_from_dict(dict indict, Node ref_node): cdef MappingNode ret = MappingNode({}, ref_node.file_index, ref_node.line, next_synthetic_counter()) cdef str k for k, v in indict.items(): vtype = type(v) if vtype is dict: ret.value[k] = _new_node_from_dict(v, ref_node) elif vtype is list: ret.value[k] = _new_node_from_list(v, ref_node) else: ret.value[k] = ScalarNode(str(v), ref_node.file_index, ref_node.line, next_synthetic_counter()) return ret # Internal function to help new_node_from_dict() to handle lists cdef Node _new_node_from_list(list inlist, Node ref_node): cdef SequenceNode ret = SequenceNode([], ref_node.file_index, ref_node.line, next_synthetic_counter()) for v in inlist: vtype = type(v) if vtype is dict: ret.value.append(_new_node_from_dict(v, ref_node)) elif vtype is list: ret.value.append(_new_node_from_list(v, ref_node)) else: ret.value.append(ScalarNode(str(v), ref_node.file_index, ref_node.line, next_synthetic_counter())) return ret # assert_symbol_name() # # A helper function to check if a loaded string is a valid symbol # name and to raise a consistent LoadError if not. For strings which # are required to be symbols. # # Args: # provenance (Provenance): The provenance of the loaded symbol, or None # symbol_name (str): The loaded symbol name # purpose (str): The purpose of the string, for an error message # allow_dashes (bool): Whether dashes are allowed for this symbol # # Raises: # LoadError: If the symbol_name is invalid # # Note that dashes are generally preferred for variable names and # usage in YAML, but things such as option names which will be # evaluated with jinja2 cannot use dashes. def assert_symbol_name(ProvenanceInformation provenance, str symbol_name, str purpose, *, bint allow_dashes=True): cdef str valid_chars = string.digits + string.ascii_letters + '_' if allow_dashes: valid_chars += '-' cdef bint valid = True if not symbol_name: valid = False elif any(x not in valid_chars for x in symbol_name): valid = False elif symbol_name[0] in string.digits: valid = False if not valid: detail = "Symbol names must contain only alphanumeric characters, " + \ "may not start with a digit, and may contain underscores" if allow_dashes: detail += " or dashes" message = "Invalid symbol name for {}: '{}'".format(purpose, symbol_name) if provenance is not None: message = "{}: {}".format(provenance, message) raise LoadError(LoadErrorReason.INVALID_SYMBOL_NAME, message, detail=detail) ############################################################################### # Roundtrip code # Represent Nodes automatically def represent_mapping(self, MappingNode mapping): return self.represent_dict(mapping.value) def represent_scalar(self, ScalarNode scalar): return self.represent_str(scalar.value) def represent_sequence(self, SequenceNode sequence): return self.represent_list(sequence.value) yaml.RoundTripRepresenter.add_representer(MappingNode, represent_mapping) yaml.RoundTripRepresenter.add_representer(ScalarNode, represent_scalar) yaml.RoundTripRepresenter.add_representer(SequenceNode, represent_sequence) # Represent simple types as strings def represent_as_str(self, value): return self.represent_str(str(value)) yaml.RoundTripRepresenter.add_representer(type(None), represent_as_str) yaml.RoundTripRepresenter.add_representer(int, represent_as_str) yaml.RoundTripRepresenter.add_representer(float, represent_as_str) yaml.RoundTripRepresenter.add_representer(bool, represent_as_str) yaml.RoundTripRepresenter.add_representer(datetime.datetime, represent_as_str) yaml.RoundTripRepresenter.add_representer(datetime.date, represent_as_str) # Always represent things consistently: yaml.RoundTripRepresenter.add_representer(OrderedDict, yaml.SafeRepresenter.represent_dict) # Always parse things consistently yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:int', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:float', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:bool', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:null', yaml.RoundTripConstructor.construct_yaml_str) yaml.RoundTripConstructor.add_constructor(u'tag:yaml.org,2002:timestamp', yaml.RoundTripConstructor.construct_yaml_str) # HardlineDumper # # This is a dumper used during roundtrip_dump which forces every scalar to be # a plain string, in order to match the output format to the input format. # # If you discover something is broken, please add a test case to the roundtrip # test in tests/internals/yaml/roundtrip-test.yaml # class HardlineDumper(yaml.RoundTripDumper): def __init__(self, *args, **kwargs): yaml.RoundTripDumper.__init__(self, *args, **kwargs) # For each of YAML 1.1 and 1.2, force everything to be a plain string for version in [(1, 1), (1, 2), None]: self.add_version_implicit_resolver( version, u'tag:yaml.org,2002:str', yaml.util.RegExp(r'.*'), None) # roundtrip_load() # # Load a YAML file into memory in a form which allows roundtripping as best # as ruamel permits. # # Note, the returned objects can be treated as Mappings and Lists and Strings # but replacing content wholesale with plain dicts and lists may result # in a loss of comments and formatting. # # Args: # filename (str): The file to load in # allow_missing (bool): Optionally set this to True to allow missing files # # Returns: # (Mapping): The loaded YAML mapping. # # Raises: # (LoadError): If the file is missing, or a directory, this is raised. # Also if the YAML is malformed. # def roundtrip_load(filename, *, allow_missing=False): try: with open(filename, "r") as fh: data = fh.read() contents = roundtrip_load_data(data, filename=filename) except FileNotFoundError as e: if allow_missing: # Missing files are always empty dictionaries return {} else: raise LoadError(LoadErrorReason.MISSING_FILE, "Could not find file at {}".format(filename)) from e except IsADirectoryError as e: raise LoadError(LoadErrorReason.LOADING_DIRECTORY, "{} is a directory." .format(filename)) from e return contents # roundtrip_load_data() # # Parse the given contents as YAML, returning them as a roundtrippable data # structure. # # A lack of content will be returned as an empty mapping. # # Args: # contents (str): The contents to be parsed as YAML # filename (str): Optional filename to be used in error reports # # Returns: # (Mapping): The loaded YAML mapping # # Raises: # (LoadError): Raised on invalid YAML, or YAML which parses to something other # than a Mapping # def roundtrip_load_data(contents, *, filename=None): try: contents = yaml.load(contents, yaml.RoundTripLoader, preserve_quotes=True) except (yaml.scanner.ScannerError, yaml.composer.ComposerError, yaml.parser.ParserError) as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Malformed YAML:\n\n{}\n\n{}\n".format(e.problem, e.problem_mark)) from e # Special case empty files at this point if contents is None: # We'll make them empty mappings like the main Node loader contents = {} if not isinstance(contents, Mapping): raise LoadError(LoadErrorReason.INVALID_YAML, "YAML file has content of type '{}' instead of expected type 'dict': {}" .format(type(contents).__name__, filename)) return contents # roundtrip_dump() # # Dumps the given contents as a YAML file. Ideally the contents came from # parsing with `roundtrip_load` or `roundtrip_load_data` so that they will be # dumped in the same form as they came from. # # If `file` is a string, it is the filename to write to, if `file` has a # `write` method, it's treated as a stream, otherwise output is to stdout. # # Args: # contents (Mapping or list): The content to write out as YAML. # file (any): The file to write to # def roundtrip_dump(contents, file=None): with ExitStack() as stack: if type(file) is str: from . import utils f = stack.enter_context(utils.save_file_atomic(file, 'w')) elif hasattr(file, 'write'): f = file else: f = sys.stdout yaml.round_trip_dump(contents, f, Dumper=HardlineDumper)