#!/usr/bin/env python3 # # Copyright (C) 2016 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom import sys import collections import copy from enum import Enum from contextlib import ExitStack from ruamel import yaml from ruamel.yaml.representer import SafeRepresenter, RoundTripRepresenter from . import ImplError, LoadError, LoadErrorReason # We store information in the loaded yaml on a DictProvenance # stored in all dictionaries under this key PROVENANCE_KEY = '__bst_provenance_info' # Provenance tracks the origin of a given node in the parsed dictionary. # # Args: # node (dict, list, value): A binding to the originally parsed value # filename (string): The filename the node was loaded from # toplevel (dict): The toplevel of the loaded file, suitable for later dumps # line (int): The line number where node was parsed # col (int): The column number where node was parsed # class Provenance(): def __init__(self, filename, node, toplevel, line=0, col=0): self.filename = filename self.node = node self.toplevel = toplevel self.line = line self.col = col # Convert a Provenance to a string for error reporting def __str__(self): return "%s [line %d column %d]" % (self.filename, self.line, self.col) # Abstract method def clone(self): raise ImplError("Unimplemented clone() in Provenance") # A Provenance for dictionaries, these are stored in the copy of the # loaded YAML tree and track the provenance of all members # class DictProvenance(Provenance): def __init__(self, filename, node, toplevel, line=None, col=None): if line is None or col is None: # Special case for loading an empty dict if hasattr(node, 'lc'): line = node.lc.line + 1 col = node.lc.col else: line = 1 col = 0 super(DictProvenance, self).__init__(filename, node, toplevel, line=line, col=col) self.members = {} def clone(self): provenance = DictProvenance(self.filename, self.node, self.toplevel, line=self.line, col=self.col) provenance.members = { member_name: member.clone() for member_name, member in self.members.items() } return provenance # A Provenance for dict members # class MemberProvenance(Provenance): def __init__(self, filename, parent_dict, member_name, toplevel, node=None, line=None, col=None): if parent_dict is not None: node = parent_dict[member_name] line, col = parent_dict.lc.value(member_name) line += 1 super(MemberProvenance, self).__init__( filename, node, toplevel, line=line, col=col) # Only used if member is a list self.elements = [] def clone(self): provenance = MemberProvenance(self.filename, None, None, self.toplevel, node=self.node, line=self.line, col=self.col) provenance.elements = [e.clone() for e in self.elements] return provenance # A Provenance for list elements # class ElementProvenance(Provenance): def __init__(self, filename, parent_list, index, toplevel, node=None, line=None, col=None): if parent_list is not None: node = parent_list[index] line, col = parent_list.lc.item(index) line += 1 super(ElementProvenance, self).__init__( filename, node, toplevel, line=line, col=col) # Only used if element is a list self.elements = [] def clone(self): provenance = ElementProvenance(self.filename, None, None, self.toplevel, node=self.node, line=self.line, col=self.col) provenance.elements = [e.clone for e in self.elements] return provenance # These exceptions are intended to be caught entirely within # the BuildStream framework, hence they do not reside in the # public exceptions.py class CompositeError(Exception): def __init__(self, path, message): super(CompositeError, self).__init__(message) self.path = path class CompositeOverrideError(CompositeError): def __init__(self, path): super(CompositeOverrideError, self).__init__( path, "Error compositing dictionary, not allowed to override key '%s'" % path) class CompositeTypeError(CompositeError): def __init__(self, path, expected_type, actual_type): super(CompositeTypeError, self).__init__( path, "Error compositing dictionary key '%s', expected source type '%s' " "but received type '%s'" % (path, expected_type.__name__, actual_type.__name__)) self.expected_type = expected_type self.actual_type = actual_type # CompositePolicy # # An enumeration defining the behavior of the dictionary_composite() # and dictionary_composite_inline() functions. # class CompositePolicy(Enum): # Every dict member overwrites members in the target dict OVERWRITE = 1 # Arrays from the overriding dict are appended to arrays in the target dict ARRAY_APPEND = 2 # Loads a dictionary from some YAML # # Args: # filename (str): The YAML file to load # shortname (str): The filename in shorthand for error reporting (or None) # copy_tree (bool): Whether to make a copy, preserving the original toplevels # for later serialization # # Returns (dict): A loaded copy of the YAML file with provenance information # # Raises: LoadError # def load(filename, shortname=None, copy_tree=False): if not shortname: shortname = filename try: with open(filename) as f: return load_data(f, shortname=shortname, copy_tree=copy_tree) except FileNotFoundError as e: raise LoadError(LoadErrorReason.MISSING_FILE, "Could not find file at %s" % filename) from e # Like load(), but doesnt require the data to be in a file # def load_data(data, shortname=None, copy_tree=False): try: contents = yaml.load(data, yaml.loader.RoundTripLoader) except (yaml.scanner.ScannerError, yaml.composer.ComposerError, yaml.parser.ParserError) as e: raise LoadError(LoadErrorReason.INVALID_YAML, "Malformed YAML:\n\n%s\n\n%s\n" % (e.problem, e.problem_mark)) from e if not isinstance(contents, dict): # Special case allowance for None, when the loaded file has only comments in it. if contents is None: contents = {} else: raise LoadError(LoadErrorReason.INVALID_YAML, "YAML file has content of type '%s' instead of expected type 'dict': %s" % (type(contents).__name__, shortname)) return node_decorated_copy(shortname, contents, copy_tree=copy_tree) # Dumps a previously loaded YAML node to a file # # Args: # node (dict): A node previously loaded with _yaml.load() above # filename (str): The YAML file to load # def dump(node, filename=None): with ExitStack() as stack: if filename: f = stack.enter_context(open(filename, 'w')) else: f = sys.stdout yaml.round_trip_dump(node, f) # node_decorated_copy() # # Create a copy of a loaded dict tree decorated with Provenance # information, used directly after loading yaml # # Args: # filename (str): The filename # toplevel (node): The toplevel dictionary node # copy_tree (bool): Whether to load a copy and preserve the original # # Returns: A copy of the toplevel decorated with Provinance # def node_decorated_copy(filename, toplevel, copy_tree=False): if copy_tree: result = copy.deepcopy(toplevel) else: result = toplevel node_decorate_dict(filename, result, toplevel, toplevel) return result def node_decorate_dict(filename, target, source, toplevel): provenance = DictProvenance(filename, source, toplevel) target[PROVENANCE_KEY] = provenance for key, value in source.items(): if key == PROVENANCE_KEY: continue member = MemberProvenance(filename, source, key, toplevel) provenance.members[key] = member target_value = target.get(key) if isinstance(value, collections.Mapping): node_decorate_dict(filename, target_value, value, toplevel) elif isinstance(value, list): member.elements = node_decorate_list(filename, target_value, value, toplevel) def node_decorate_list(filename, target, source, toplevel): elements = [] for item in source: idx = source.index(item) target_item = target[idx] element = ElementProvenance(filename, source, idx, toplevel) if isinstance(item, collections.Mapping): node_decorate_dict(filename, target_item, item, toplevel) elif isinstance(item, list): element.elements = node_decorate_list(filename, target_item, item, toplevel) elements.append(element) return elements # node_get_provenance() # # Gets the provenance for a node # # Args: # node (dict): a dictionary # key (str): key in the dictionary # indices (list of indexes): Index path, in the case of list values # # Returns: The Provenance of the dict, member or list element # def node_get_provenance(node, key=None, indices=[]): provenance = node.get(PROVENANCE_KEY) if provenance and key: provenance = provenance.members.get(key) for index in indices: provenance = provenance.elements[index] return provenance # node_get() # # Fetches a value from a dictionary node and checks it for # an expected value. Use default_value when parsing a value # which is only optionally supplied. # # Args: # node (dict): The dictionary node # expected_type (type): The expected type for the value being searched # key (str): The key to get a value for in node # indices (list of ints): Optionally decend into lists of lists # # Returns: # The value if found in node, otherwise default_value is returned # # Raises: # LoadError, when the value found is not of the expected type # # Note: # Returned strings are stripped of leading and trailing whitespace # def node_get(node, expected_type, key, indices=[], default_value=None): value = node.get(key, default_value) provenance = node_get_provenance(node) if value is None: raise LoadError(LoadErrorReason.INVALID_DATA, "%s: Dictionary did not contain expected key '%s'" % (str(provenance), key)) path = key if indices: # Implied type check of the element itself value = node_get(node, list, key) for index in indices: value = value[index] path += '[%d]' % index if not isinstance(value, expected_type): # Attempt basic conversions if possible, typically we want to # be able to specify numeric values and convert them to strings, # but we dont want to try converting dicts/lists try: if not (expected_type == list or expected_type == dict or isinstance(value, list) or isinstance(value, dict)): value = expected_type(value) else: raise ValueError() except (ValueError, TypeError): provenance = node_get_provenance(node, key=key, indices=indices) raise LoadError(LoadErrorReason.INVALID_DATA, "%s: Value of '%s' is not of the expected type '%s'" % (str(provenance), path, expected_type.__name__)) # Trim it at the bud, let all loaded strings from yaml be stripped of whitespace if isinstance(value, str): value = value.strip() return value # Gives a node a dummy provenance, in case of compositing dictionaries # where the target is an empty {} def ensure_provenance(node): provenance = node.get(PROVENANCE_KEY) if not provenance: provenance = DictProvenance('', node, node) node[PROVENANCE_KEY] = provenance return provenance # is_ruamel_str(): # # Args: # value: A value loaded from ruamel # # This returns if the value is "stringish", since ruamel # has some complex types to represent strings, this is needed # to avoid compositing exceptions in order to allow various # string types to be interchangable and acceptable # def is_ruamel_str(value): if isinstance(value, str): return True elif isinstance(value, yaml.scalarstring.ScalarString): return True return False # composite_dict(): # # Composites values in target with values from source # # Args: # target (dict): A simple dictionary # source (dict): Another simple dictionary # policy (CompositePolicy): Defines compositing behavior # typesafe (bool): If True, then raise errors when overriding members # with differing types # # Raises: CompositeError # # Unlike the dictionary update() method, nested values in source # will not obsolete entire subdictionaries in target, instead both # dictionaries will be recursed and a composition of both will result # # This is useful for overriding configuration files and element # configurations. # def composite_dict(target, source, policy=CompositePolicy.OVERWRITE, typesafe=False, path=None): target_provenance = ensure_provenance(target) source_provenance = ensure_provenance(source) for key, source_value in source.items(): # Handle the provenance keys specially if key == PROVENANCE_KEY: continue # Track the full path of keys, only for raising CompositeError if path: thispath = path + '.' + key else: thispath = key target_value = target.get(key) if isinstance(source_value, collections.Mapping): # Handle creating new dicts on target side if target_value is None: target_value = {} target[key] = target_value # Give the new dict provenance value_provenance = source_value.get(PROVENANCE_KEY) if value_provenance: target_value[PROVENANCE_KEY] = value_provenance.clone() # Add a new provenance member element to the containing dict target_provenance.members[key] = source_provenance.members[key] if not isinstance(target_value, collections.Mapping): raise CompositeTypeError(thispath, type(target_value), type(source_value)) # Recurse into matching dictionary composite_dict(target_value, source_value, policy=policy, typesafe=typesafe, path=thispath) else: # Optionally enforce typesafe copy if typesafe and target_value is not None: # Exception here: depending on how strings were declared ruamel may # use a different type, but for our purposes, any stringish type will do. if not (is_ruamel_str(source_value) and is_ruamel_str(target_value)) \ and not isinstance(source_value, type(target_value)): raise CompositeTypeError(thispath, type(target_value), type(source_value)) if policy == CompositePolicy.OVERWRITE: # Provenance and value is overwritten target_provenance.members[key] = source_provenance.members[key] # Ensure target has only copies of mutable source values if (isinstance(target_value, list) and isinstance(source_value, list)): target[key] = list_chain_copy(source_value) else: target[key] = source_value elif policy == CompositePolicy.ARRAY_APPEND: if (isinstance(target_value, list) and isinstance(source_value, list)): # Ensure target has only copies of mutable source values target[key] += list_chain_copy(source_value) # Append element provenances from source list to target target_list_provenance = target_provenance.members[key] source_list_provenance = source_provenance.members[key] for item in source_list_provenance.elements: target_list_provenance.elements.append(item.clone()) else: # Provenance is overwritten target[key] = source_value target_provenance.members[key] = source_provenance.members[key].clone() else: # pragma: no cover raise ValueError("Unhandled CompositePolicy in switch case") # Like composite_dict(), but raises an all purpose LoadError for convenience # def composite(target, source, policy=CompositePolicy.OVERWRITE, typesafe=False): provenance = node_get_provenance(source) try: composite_dict(target, source, policy=policy, typesafe=typesafe) except CompositeTypeError as e: error_prefix = "" if provenance: error_prefix = "[%s]: " % str(provenance) raise LoadError(LoadErrorReason.ILLEGAL_COMPOSITE, "%sExpected '%s' type for configuration '%s', instead received '%s'" % (error_prefix, e.expected_type.__name__, e.path, e.actual_type.__name__)) from e # SanitizedDict is an OrderedDict that is dumped as unordered mapping. # This provides deterministic output for unordered mappings. # class SanitizedDict(collections.OrderedDict): pass RoundTripRepresenter.add_representer(SanitizedDict, SafeRepresenter.represent_dict) # node_sanitize() # # Returnes an alphabetically ordered recursive copy # of the source node with internal provenance information stripped. # # Only dicts are ordered, list elements are left in order. # def node_sanitize(node): if isinstance(node, collections.Mapping): result = SanitizedDict() for key in sorted(node, key=lambda s: (s == PROVENANCE_KEY, s)): if key == PROVENANCE_KEY: continue result[key] = node_sanitize(node[key]) return result elif isinstance(node, list): return [node_sanitize(elt) for elt in node] return node def validate_node(node, valid_keys): # Probably the fastest way to do this: https://stackoverflow.com/a/23062482 valid_keys = set(valid_keys) valid_keys.add(PROVENANCE_KEY) invalid = next((key for key in node if key not in valid_keys), None) if invalid: provenance = node_get_provenance(node, key=invalid) raise LoadError(LoadErrorReason.INVALID_DATA, "[{}]: Unexpected key: {}".format(provenance, invalid)) def node_chain_copy(source): copy = collections.ChainMap({}, source) for key, value in source.items(): if isinstance(value, collections.Mapping): copy[key] = node_chain_copy(value) elif isinstance(value, list): copy[key] = list_chain_copy(value) elif isinstance(value, Provenance): copy[key] = value.clone() return copy def list_chain_copy(source): copy = [] for item in source: if isinstance(item, collections.Mapping): copy.append(node_chain_copy(item)) elif isinstance(item, list): copy.append(list_chain_copy(item)) elif isinstance(item, Provenance): copy.append(item.clone()) else: copy.append(item) return copy def node_copy(source): copy = {} for key, value in source.items(): if isinstance(value, collections.Mapping): copy[key] = node_copy(value) elif isinstance(value, list): copy[key] = list_copy(value) elif isinstance(value, Provenance): copy[key] = value.clone() else: copy[key] = value ensure_provenance(copy) return copy def list_copy(source): copy = [] for item in source: if isinstance(item, collections.Mapping): copy.append(node_copy(item)) elif isinstance(item, list): copy.append(list_copy(item)) elif isinstance(item, Provenance): copy.append(item.clone()) else: copy.append(item) return copy