diff options
Diffstat (limited to 'pint/_vendor')
-rw-r--r-- | pint/_vendor/flexparser.py | 1455 |
1 files changed, 1455 insertions, 0 deletions
diff --git a/pint/_vendor/flexparser.py b/pint/_vendor/flexparser.py new file mode 100644 index 0000000..8945b6e --- /dev/null +++ b/pint/_vendor/flexparser.py @@ -0,0 +1,1455 @@ +""" + flexparser.flexparser + ~~~~~~~~~~~~~~~~~~~~~ + + Classes and functions to create parsers. + + The idea is quite simple. You write a class for every type of content + (called here ``ParsedStatement``) you need to parse. Each class should + have a ``from_string`` constructor. We used extensively the ``typing`` + module to make the output structure easy to use and less error prone. + + For more information, take a look at https://github.com/hgrecco/flexparser + + :copyright: 2022 by flexparser Authors, see AUTHORS for more details. + :license: BSD, see LICENSE for more details. +""" + +from __future__ import annotations + +import collections +import dataclasses +import enum +import functools +import hashlib +import hmac +import inspect +import logging +import pathlib +import re +import sys +import typing as ty +from collections.abc import Iterator +from dataclasses import dataclass +from functools import cached_property +from importlib import resources +from typing import Optional, Tuple, Type + +_LOGGER = logging.getLogger("flexparser") + +_SENTINEL = object() + + +################ +# Exceptions +################ + + +@dataclass(frozen=True) +class Statement: + """Base class for parsed elements within a source file.""" + + start_line: int = dataclasses.field(init=False, default=None) + start_col: int = dataclasses.field(init=False, default=None) + + end_line: int = dataclasses.field(init=False, default=None) + end_col: int = dataclasses.field(init=False, default=None) + + raw: str = dataclasses.field(init=False, default=None) + + @classmethod + def from_statement(cls, statement: Statement): + out = cls() + out.set_position(*statement.get_position()) + out.set_raw(statement.raw) + return out + + @classmethod + def from_statement_iterator_element(cls, values: ty.Tuple[int, int, int, int, str]): + out = cls() + out.set_position(*values[:-1]) + out.set_raw(values[-1]) + return out + + @property + def format_position(self): + if self.start_line is None: + return "N/A" + return "%d,%d-%d,%d" % self.get_position() + + @property + def raw_strip(self): + return self.raw.strip() + + def get_position(self): + return self.start_line, self.start_col, self.end_line, self.end_col + + def set_position(self, start_line, start_col, end_line, end_col): + object.__setattr__(self, "start_line", start_line) + object.__setattr__(self, "start_col", start_col) + object.__setattr__(self, "end_line", end_line) + object.__setattr__(self, "end_col", end_col) + return self + + def set_raw(self, raw): + object.__setattr__(self, "raw", raw) + return self + + def set_simple_position(self, line, col, width): + return self.set_position(line, col, line, col + width) + + +@dataclass(frozen=True) +class ParsingError(Statement, Exception): + """Base class for all parsing exceptions in this package.""" + + def __str__(self): + return Statement.__str__(self) + + +@dataclass(frozen=True) +class UnknownStatement(ParsingError): + """A string statement could not bee parsed.""" + + def __str__(self): + return f"Could not parse '{self.raw}' ({self.format_position})" + + +@dataclass(frozen=True) +class UnhandledParsingError(ParsingError): + """Base class for all parsing exceptions in this package.""" + + ex: Exception + + def __str__(self): + return f"Unhandled exception while parsing '{self.raw}' ({self.format_position}): {self.ex}" + + +@dataclass(frozen=True) +class UnexpectedEOF(ParsingError): + """End of file was found within an open block.""" + + +############################# +# Useful methods and classes +############################# + + +@dataclass(frozen=True) +class Hash: + algorithm_name: str + hexdigest: str + + def __eq__(self, other: Hash): + return ( + isinstance(other, Hash) + and self.algorithm_name != "" + and self.algorithm_name == other.algorithm_name + and hmac.compare_digest(self.hexdigest, other.hexdigest) + ) + + @classmethod + def from_bytes(cls, algorithm, b: bytes): + hasher = algorithm(b) + return cls(hasher.name, hasher.hexdigest()) + + @classmethod + def from_file_pointer(cls, algorithm, fp: ty.BinaryIO): + return cls.from_bytes(algorithm, fp.read()) + + @classmethod + def nullhash(cls): + return cls("", "") + + +def _yield_types( + obj, valid_subclasses=(object,), recurse_origin=(tuple, list, ty.Union) +): + """Recursively transverse type annotation if the + origin is any of the types in `recurse_origin` + and yield those type which are subclasses of `valid_subclasses`. + + """ + if ty.get_origin(obj) in recurse_origin: + for el in ty.get_args(obj): + yield from _yield_types(el, valid_subclasses, recurse_origin) + else: + if inspect.isclass(obj) and issubclass(obj, valid_subclasses): + yield obj + + +class classproperty: # noqa N801 + """Decorator for a class property + + In Python 3.9+ can be replaced by + + @classmethod + @property + def myprop(self): + return 42 + + """ + + def __init__(self, fget): + self.fget = fget + + def __get__(self, owner_self, owner_cls): + return self.fget(owner_cls) + + +def is_relative_to(self, *other): + """Return True if the path is relative to another path or False. + + In Python 3.9+ can be replaced by + + path.is_relative_to(other) + """ + try: + self.relative_to(*other) + return True + except ValueError: + return False + + +class DelimiterInclude(enum.IntEnum): + """Specifies how to deal with delimiters while parsing.""" + + #: Split at delimiter, not including in any string + SPLIT = enum.auto() + + #: Split after, keeping the delimiter with previous string. + SPLIT_AFTER = enum.auto() + + #: Split before, keeping the delimiter with next string. + SPLIT_BEFORE = enum.auto() + + #: Do not split at delimiter. + DO_NOT_SPLIT = enum.auto() + + +class DelimiterAction(enum.IntEnum): + """Specifies how to deal with delimiters while parsing.""" + + #: Continue parsing normally. + CONTINUE = enum.auto() + + #: Capture everything til end of line as a whole. + CAPTURE_NEXT_TIL_EOL = enum.auto() + + #: Stop parsing line and move to next. + STOP_PARSING_LINE = enum.auto() + + #: Stop parsing content. + STOP_PARSING = enum.auto() + + +DO_NOT_SPLIT_EOL = { + "\r\n": (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.CONTINUE), + "\n": (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.CONTINUE), + "\r": (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.CONTINUE), +} + +SPLIT_EOL = { + "\r\n": (DelimiterInclude.SPLIT, DelimiterAction.CONTINUE), + "\n": (DelimiterInclude.SPLIT, DelimiterAction.CONTINUE), + "\r": (DelimiterInclude.SPLIT, DelimiterAction.CONTINUE), +} + +_EOLs_set = set(DO_NOT_SPLIT_EOL.keys()) + + +@functools.lru_cache +def _build_delimiter_pattern(delimiters: ty.Tuple[str, ...]) -> re.Pattern: + """Compile a tuple of delimiters into a regex expression with a capture group + around the delimiter. + """ + return re.compile("|".join(f"({re.escape(el)})" for el in delimiters)) + + +############ +# Iterators +############ + +DelimiterDictT = ty.Dict[str, ty.Tuple[DelimiterInclude, DelimiterAction]] + + +class Spliter: + """Content iterator splitting according to given delimiters. + + The pattern can be changed dynamically sending a new pattern to the generator, + see DelimiterInclude and DelimiterAction for more information. + + The current scanning position can be changed at any time. + + Parameters + ---------- + content : str + delimiters : ty.Dict[str, ty.Tuple[DelimiterInclude, DelimiterAction]] + + Yields + ------ + start_line : int + line number of the start of the content (zero-based numbering). + start_col : int + column number of the start of the content (zero-based numbering). + end_line : int + line number of the end of the content (zero-based numbering). + end_col : int + column number of the end of the content (zero-based numbering). + part : str + part of the text between delimiters. + """ + + _pattern: ty.Optional[re.Pattern] + _delimiters: DelimiterDictT + + __stop_searching_in_line = False + + __pending = "" + __first_line_col = None + + __lines = () + __lineno = 0 + __colno = 0 + + def __init__(self, content: str, delimiters: DelimiterDictT): + self.set_delimiters(delimiters) + self.__lines = content.splitlines(keepends=True) + + def set_position(self, lineno: int, colno: int): + self.__lineno, self.__colno = lineno, colno + + def set_delimiters(self, delimiters: DelimiterDictT): + for k, v in delimiters.items(): + if v == (DelimiterInclude.DO_NOT_SPLIT, DelimiterAction.STOP_PARSING): + raise ValueError( + f"The delimiter action for {k} is not a valid combination ({v})" + ) + # Build a pattern but removing eols + _pat_dlm = tuple(set(delimiters.keys()) - _EOLs_set) + if _pat_dlm: + self._pattern = _build_delimiter_pattern(_pat_dlm) + else: + self._pattern = None + # We add the end of line as delimiters if not present. + self._delimiters = {**DO_NOT_SPLIT_EOL, **delimiters} + + def __iter__(self): + return self + + def __next__(self): + if self.__lineno >= len(self.__lines): + raise StopIteration + + while True: + if self.__stop_searching_in_line: + # There must be part of a line pending to parse + # due to stop + line = self.__lines[self.__lineno] + mo = None + self.__stop_searching_in_line = False + else: + # We get the current line and the find the first delimiter. + line = self.__lines[self.__lineno] + if self._pattern is None: + mo = None + else: + mo = self._pattern.search(line, self.__colno) + + if mo is None: + # No delimiter was found, + # which should happen at end of the content or end of line + for k in DO_NOT_SPLIT_EOL.keys(): + if line.endswith(k): + dlm = line[-len(k) :] + end_col, next_col = len(line) - len(k), 0 + break + else: + # No EOL found, this is end of content + dlm = None + end_col, next_col = len(line), 0 + + next_line = self.__lineno + 1 + + else: + next_line = self.__lineno + end_col, next_col = mo.span() + dlm = mo.group() + + part = line[self.__colno : end_col] + + include, action = self._delimiters.get( + dlm, (DelimiterInclude.SPLIT, DelimiterAction.STOP_PARSING) + ) + + if include == DelimiterInclude.SPLIT: + next_pending = "" + elif include == DelimiterInclude.SPLIT_AFTER: + end_col += len(dlm) + part = part + dlm + next_pending = "" + elif include == DelimiterInclude.SPLIT_BEFORE: + next_pending = dlm + elif include == DelimiterInclude.DO_NOT_SPLIT: + self.__pending += line[self.__colno : end_col] + dlm + next_pending = "" + else: + raise ValueError(f"Unknown action {include}.") + + if action == DelimiterAction.STOP_PARSING: + # this will raise a StopIteration in the next call. + next_line = len(self.__lines) + elif action == DelimiterAction.STOP_PARSING_LINE: + next_line = self.__lineno + 1 + next_col = 0 + + start_line = self.__lineno + start_col = self.__colno + end_line = self.__lineno + + self.__lineno = next_line + self.__colno = next_col + + if action == DelimiterAction.CAPTURE_NEXT_TIL_EOL: + self.__stop_searching_in_line = True + + if include == DelimiterInclude.DO_NOT_SPLIT: + self.__first_line_col = start_line, start_col + else: + if self.__first_line_col is None: + out = ( + start_line, + start_col - len(self.__pending), + end_line, + end_col, + self.__pending + part, + ) + else: + out = ( + *self.__first_line_col, + end_line, + end_col, + self.__pending + part, + ) + self.__first_line_col = None + self.__pending = next_pending + return out + + +class StatementIterator: + """Content peekable iterator splitting according to given delimiters. + + The pattern can be changed dynamically sending a new pattern to the generator, + see DelimiterInclude and DelimiterAction for more information. + + Parameters + ---------- + content : str + delimiters : dict[str, ty.Tuple[DelimiterInclude, DelimiterAction]] + + Yields + ------ + Statement + """ + + _cache: ty.Deque[Statement] + + def __init__( + self, content: str, delimiters: DelimiterDictT, strip_spaces: bool = True + ): + self._cache = collections.deque() + self._spliter = Spliter(content, delimiters) + self._strip_spaces = strip_spaces + + def __iter__(self): + return self + + def set_delimiters(self, delimiters: DelimiterDictT): + self._spliter.set_delimiters(delimiters) + if self._cache: + value = self.peek() + # Elements are 1 based indexing, while splitter is 0 based. + self._spliter.set_position(value.start_line - 1, value.start_col) + self._cache.clear() + + def _get_next_strip(self) -> Statement: + part = "" + while not part: + start_line, start_col, end_line, end_col, part = next(self._spliter) + lo = len(part) + part = part.lstrip() + start_col += lo - len(part) + + lo = len(part) + part = part.rstrip() + end_col -= lo - len(part) + + return Statement.from_statement_iterator_element( + (start_line + 1, start_col, end_line + 1, end_col, part) + ) + + def _get_next(self) -> Statement: + if self._strip_spaces: + return self._get_next_strip() + + part = "" + while not part: + start_line, start_col, end_line, end_col, part = next(self._spliter) + + return Statement.from_statement_iterator_element( + (start_line + 1, start_col, end_line + 1, end_col, part) + ) + + def peek(self, default=_SENTINEL) -> Statement: + """Return the item that will be next returned from ``next()``. + + Return ``default`` if there are no items left. If ``default`` is not + provided, raise ``StopIteration``. + + """ + if not self._cache: + try: + self._cache.append(self._get_next()) + except StopIteration: + if default is _SENTINEL: + raise + return default + return self._cache[0] + + def __next__(self) -> Statement: + if self._cache: + return self._cache.popleft() + else: + return self._get_next() + + +########### +# Parsing +########### + +# Configuration type +CT = ty.TypeVar("CT") +PST = ty.TypeVar("PST", bound="ParsedStatement") +LineColStr = Tuple[int, int, str] +FromString = ty.Union[None, PST, ParsingError] +Consume = ty.Union[PST, ParsingError] +NullableConsume = ty.Union[None, PST, ParsingError] + +Single = ty.Union[PST, ParsingError] +Multi = ty.Tuple[ty.Union[PST, ParsingError], ...] + + +@dataclass(frozen=True) +class ParsedStatement(ty.Generic[CT], Statement): + """A single parsed statement. + + In order to write your own, you need to subclass it as a + frozen dataclass and implement the parsing logic by overriding + `from_string` classmethod. + + Takes two arguments: the string to parse and an object given + by the parser which can be used to store configuration information. + + It should return an instance of this class if parsing + was successful or None otherwise + """ + + @classmethod + def from_string(cls: Type[PST], s: str) -> FromString[PST]: + """Parse a string into a ParsedStatement. + + Return files and their meaning: + 1. None: the string cannot be parsed with this class. + 2. A subclass of ParsedStatement: the string was parsed successfully + 3. A subclass of ParsingError the string could be parsed with this class but there is + an error. + """ + raise NotImplementedError( + "ParsedStatement subclasses must implement " + "'from_string' or 'from_string_and_config'" + ) + + @classmethod + def from_string_and_config(cls: Type[PST], s: str, config: CT) -> FromString[PST]: + """Parse a string into a ParsedStatement. + + Return files and their meaning: + 1. None: the string cannot be parsed with this class. + 2. A subclass of ParsedStatement: the string was parsed successfully + 3. A subclass of ParsingError the string could be parsed with this class but there is + an error. + """ + return cls.from_string(s) + + @classmethod + def from_statement_and_config( + cls: Type[PST], statement: Statement, config: CT + ) -> FromString[PST]: + try: + out = cls.from_string_and_config(statement.raw, config) + except Exception as ex: + out = UnhandledParsingError(ex) + + if out is None: + return None + + out.set_position(*statement.get_position()) + out.set_raw(statement.raw) + return out + + @classmethod + def consume( + cls: Type[PST], statement_iterator: StatementIterator, config: CT + ) -> NullableConsume[PST]: + """Peek into the iterator and try to parse. + + Return files and their meaning: + 1. None: the string cannot be parsed with this class, the iterator is kept an the current place. + 2. a subclass of ParsedStatement: the string was parsed successfully, advance the iterator. + 3. a subclass of ParsingError: the string could be parsed with this class but there is + an error, advance the iterator. + """ + statement = statement_iterator.peek() + parsed_statement = cls.from_statement_and_config(statement, config) + if parsed_statement is None: + return None + next(statement_iterator) + return parsed_statement + + +OPST = ty.TypeVar("OPST", bound="ParsedStatement") +IPST = ty.TypeVar("IPST", bound="ParsedStatement") +CPST = ty.TypeVar("CPST", bound="ParsedStatement") +BT = ty.TypeVar("BT", bound="Block") +RBT = ty.TypeVar("RBT", bound="RootBlock") + + +@dataclass(frozen=True) +class Block(ty.Generic[OPST, IPST, CPST, CT]): + """A sequence of statements with an opening, body and closing.""" + + opening: Consume[OPST] + body: Tuple[Consume[IPST], ...] + closing: Consume[CPST] + + delimiters = {} + + @property + def start_line(self): + return self.opening.start_line + + @property + def start_col(self): + return self.opening.start_col + + @property + def end_line(self): + return self.closing.end_line + + @property + def end_col(self): + return self.closing.end_col + + def get_position(self): + return self.start_line, self.start_col, self.end_line, self.end_col + + @property + def format_position(self): + if self.start_line is None: + return "N/A" + return "%d,%d-%d,%d" % self.get_position() + + @classmethod + def subclass_with(cls, *, opening=None, body=None, closing=None): + @dataclass(frozen=True) + class CustomBlock(Block): + pass + + if opening: + CustomBlock.__annotations__["opening"] = Single[ty.Union[opening]] + if body: + CustomBlock.__annotations__["body"] = Multi[ty.Union[body]] + if closing: + CustomBlock.__annotations__["closing"] = Single[ty.Union[closing]] + + return CustomBlock + + def __iter__(self) -> Iterator[Statement]: + yield self.opening + for el in self.body: + if isinstance(el, Block): + yield from el + else: + yield el + yield self.closing + + def iter_blocks(self) -> Iterator[ty.Union[Block, Statement]]: + yield self.opening + yield from self.body + yield self.closing + + ################################################### + # Convenience methods to iterate parsed statements + ################################################### + + _ElementT = ty.TypeVar("_ElementT", bound=Statement) + + def filter_by(self, *klass: Type[_ElementT]) -> Iterator[_ElementT]: + """Yield elements of a given class or classes.""" + yield from (el for el in self if isinstance(el, klass)) # noqa Bug in pycharm. + + @cached_property + def errors(self) -> ty.Tuple[ParsingError, ...]: + """Tuple of errors found.""" + return tuple(self.filter_by(ParsingError)) + + @property + def has_errors(self) -> bool: + """True if errors were found during parsing.""" + return bool(self.errors) + + #################### + # Statement classes + #################### + + @classproperty + def opening_classes(cls) -> Iterator[Type[OPST]]: + """Classes representing any of the parsed statement that can open this block.""" + opening = ty.get_type_hints(cls)["opening"] + yield from _yield_types(opening, ParsedStatement) + + @classproperty + def body_classes(cls) -> Iterator[Type[IPST]]: + """Classes representing any of the parsed statement that can be in the body.""" + body = ty.get_type_hints(cls)["body"] + yield from _yield_types(body, (ParsedStatement, Block)) + + @classproperty + def closing_classes(cls) -> Iterator[Type[CPST]]: + """Classes representing any of the parsed statement that can close this block.""" + closing = ty.get_type_hints(cls)["closing"] + yield from _yield_types(closing, ParsedStatement) + + ########## + # Consume + ########## + + @classmethod + def consume_opening( + cls: Type[BT], statement_iterator: StatementIterator, config: CT + ) -> NullableConsume[OPST]: + """Peek into the iterator and try to parse with any of the opening classes. + + See `ParsedStatement.consume` for more details. + """ + for c in cls.opening_classes: + el = c.consume(statement_iterator, config) + if el is not None: + return el + return None + + @classmethod + def consume_body( + cls, statement_iterator: StatementIterator, config: CT + ) -> Consume[IPST]: + """Peek into the iterator and try to parse with any of the body classes. + + If the statement cannot be parsed, a UnknownStatement is returned. + """ + for c in cls.body_classes: + el = c.consume(statement_iterator, config) + if el is not None: + return el + el = next(statement_iterator) + return UnknownStatement.from_statement(el) + + @classmethod + def consume_closing( + cls: Type[BT], statement_iterator: StatementIterator, config: CT + ) -> NullableConsume[CPST]: + """Peek into the iterator and try to parse with any of the opening classes. + + See `ParsedStatement.consume` for more details. + """ + for c in cls.closing_classes: + el = c.consume(statement_iterator, config) + if el is not None: + return el + return None + + @classmethod + def consume_body_closing( + cls: Type[BT], opening: OPST, statement_iterator: StatementIterator, config: CT + ) -> BT: + body = [] + closing = None + last_line = opening.end_line + while closing is None: + try: + closing = cls.consume_closing(statement_iterator, config) + if closing is not None: + continue + el = cls.consume_body(statement_iterator, config) + body.append(el) + last_line = el.end_line + except StopIteration: + closing = cls.on_stop_iteration(config) + closing.set_position(last_line + 1, 0, last_line + 1, 0) + + return cls(opening, tuple(body), closing) + + @classmethod + def consume( + cls: Type[BT], statement_iterator: StatementIterator, config: CT + ) -> Optional[BT]: + """Try consume the block. + + Possible outcomes: + 1. The opening was not matched, return None. + 2. A subclass of Block, where body and closing migh contain errors. + """ + opening = cls.consume_opening(statement_iterator, config) + if opening is None: + return None + + return cls.consume_body_closing(opening, statement_iterator, config) + + @classmethod + def on_stop_iteration(cls, config): + return UnexpectedEOF() + + +@dataclass(frozen=True) +class BOS(ParsedStatement[CT]): + """Beginning of source.""" + + # Hasher algorithm name and hexdigest + content_hash: Hash + + @classmethod + def from_string_and_config(cls: Type[PST], s: str, config: CT) -> FromString[PST]: + raise RuntimeError("BOS cannot be constructed from_string_and_config") + + @property + def location(self) -> SourceLocationT: + return "<undefined>" + + +@dataclass(frozen=True) +class BOF(BOS): + """Beginning of file.""" + + path: pathlib.Path + + # Modification time of the file. + mtime: float + + @property + def location(self) -> SourceLocationT: + return self.path + + +@dataclass(frozen=True) +class BOR(BOS): + """Beginning of resource.""" + + package: str + resource_name: str + + @property + def location(self) -> SourceLocationT: + return self.package, self.resource_name + + +@dataclass(frozen=True) +class EOS(ParsedStatement[CT]): + """End of sequence.""" + + @classmethod + def from_string_and_config(cls: Type[PST], s: str, config: CT) -> FromString[PST]: + return cls() + + +class RootBlock(ty.Generic[IPST, CT], Block[BOS, IPST, EOS, CT]): + """A sequence of statement flanked by the beginning and ending of stream.""" + + opening: Single[BOS] + closing: Single[EOS] + + @classmethod + def subclass_with(cls, *, body=None): + @dataclass(frozen=True) + class CustomRootBlock(RootBlock): + pass + + if body: + CustomRootBlock.__annotations__["body"] = Multi[ty.Union[body]] + + return CustomRootBlock + + @classmethod + def consume_opening( + cls: Type[RBT], statement_iterator: StatementIterator, config: CT + ) -> NullableConsume[BOS]: + raise RuntimeError( + "Implementation error, 'RootBlock.consume_opening' should never be called" + ) + + @classmethod + def consume( + cls: Type[RBT], statement_iterator: StatementIterator, config: CT + ) -> RBT: + block = super().consume(statement_iterator, config) + if block is None: + raise RuntimeError( + "Implementation error, 'RootBlock.consume' should never return None" + ) + return block + + @classmethod + def consume_closing( + cls: Type[RBT], statement_iterator: StatementIterator, config: CT + ) -> NullableConsume[EOS]: + return None + + @classmethod + def on_stop_iteration(cls, config): + return EOS() + + +################# +# Source parsing +################# + +ResourceT = ty.Tuple[str, str] # package name, resource name +StrictLocationT = ty.Union[pathlib.Path, ResourceT] +SourceLocationT = ty.Union[str, StrictLocationT] + + +@dataclass(frozen=True) +class ParsedSource(ty.Generic[RBT, CT]): + + parsed_source: RBT + + # Parser configuration. + config: CT + + @property + def location(self) -> StrictLocationT: + return self.parsed_source.opening.location + + @cached_property + def has_errors(self) -> bool: + return self.parsed_source.has_errors + + def errors(self): + yield from self.parsed_source.errors + + +@dataclass(frozen=True) +class CannotParseResourceAsFile(Exception): + """The requested python package resource cannot be located as a file + in the file system. + """ + + package: str + resource_name: str + + +class Parser(ty.Generic[RBT, CT]): + """Parser class.""" + + #: class to iterate through statements in a source unit. + _statement_iterator_class: Type[StatementIterator] = StatementIterator + + #: Delimiters. + _delimiters: DelimiterDictT = SPLIT_EOL + + _strip_spaces: bool = True + + #: root block class containing statements and blocks can be parsed. + _root_block_class: Type[RBT] + + #: source file text encoding. + _encoding = "utf-8" + + #: configuration passed to from_string functions. + _config: CT + + #: try to open resources as files. + _prefer_resource_as_file: bool + + #: parser algorithm to us. Must be a callable member of hashlib + _hasher = hashlib.blake2b + + def __init__(self, config: CT, prefer_resource_as_file=True): + self._config = config + self._prefer_resource_as_file = prefer_resource_as_file + + def parse(self, source_location: SourceLocationT) -> ParsedSource[RBT, CT]: + """Parse a file into a ParsedSourceFile or ParsedResource. + + Parameters + ---------- + source_location: + if str or pathlib.Path is interpreted as a file. + if (str, str) is interpreted as (package, resource) using the resource python api. + """ + if isinstance(source_location, tuple) and len(source_location) == 2: + if self._prefer_resource_as_file: + try: + return self.parse_resource_from_file(*source_location) + except CannotParseResourceAsFile: + pass + return self.parse_resource(*source_location) + + if isinstance(source_location, str): + return self.parse_file(pathlib.Path(source_location)) + + if isinstance(source_location, pathlib.Path): + return self.parse_file(source_location) + + raise TypeError( + f"Unknown type {type(source_location)}, " + "use str or pathlib.Path for files or " + "(package: str, resource_name: str) tuple " + "for a resource." + ) + + def parse_bytes(self, b: bytes, bos: BOS = None) -> ParsedSource[RBT, CT]: + if bos is None: + bos = BOS(Hash.from_bytes(self._hasher, b)).set_simple_position(0, 0, 0) + + sic = self._statement_iterator_class( + b.decode(self._encoding), self._delimiters, self._strip_spaces + ) + + parsed = self._root_block_class.consume_body_closing(bos, sic, self._config) + + return ParsedSource( + parsed, + self._config, + ) + + def parse_file(self, path: pathlib.Path) -> ParsedSource[RBT, CT]: + """Parse a file into a ParsedSourceFile. + + Parameters + ---------- + path + path of the file. + """ + with path.open(mode="rb") as fi: + content = fi.read() + + bos = BOF( + Hash.from_bytes(self._hasher, content), path, path.stat().st_mtime + ).set_simple_position(0, 0, 0) + return self.parse_bytes(content, bos) + + def parse_resource_from_file( + self, package: str, resource_name: str + ) -> ParsedSource[RBT, CT]: + """Parse a resource into a ParsedSourceFile, opening as a file. + + Parameters + ---------- + package + package name where the resource is located. + resource_name + name of the resource + """ + if sys.version_info < (3, 9): + # Remove when Python 3.8 is dropped + with resources.path(package, resource_name) as p: + path = p.resolve() + else: + with resources.as_file( + resources.files(package).joinpath(resource_name) + ) as p: + path = p.resolve() + + if path.exists(): + return self.parse_file(path) + + raise CannotParseResourceAsFile(package, resource_name) + + def parse_resource(self, package: str, resource_name: str) -> ParsedSource[RBT, CT]: + """Parse a resource into a ParsedResource. + + Parameters + ---------- + package + package name where the resource is located. + resource_name + name of the resource + """ + if sys.version_info < (3, 9): + # Remove when Python 3.8 is dropped + with resources.open_binary(package, resource_name) as fi: + content = fi.read() + else: + with resources.files(package).joinpath(resource_name).open("rb") as fi: + content = fi.read() + + bos = BOR( + Hash.from_bytes(self._hasher, content), package, resource_name + ).set_simple_position(0, 0, 0) + + return self.parse_bytes(content, bos) + + +########## +# Project +########## + + +class IncludeStatement(ParsedStatement): + """ "Include statements allow to merge files.""" + + @property + def target(self) -> str: + raise NotImplementedError( + "IncludeStatement subclasses must implement target property." + ) + + +class ParsedProject( + ty.Dict[ + ty.Optional[ty.Tuple[StrictLocationT, str]], + ParsedSource, + ] +): + """Collection of files, independent or connected via IncludeStatement. + + Keys are either an absolute pathname or a tuple package name, resource name. + + None is the name of the root. + + """ + + @cached_property + def has_errors(self) -> bool: + return any(el.has_errors for el in self.values()) + + def errors(self): + for el in self.values(): + yield from el.errors() + + def _iter_statements(self, items, seen, include_only_once): + """Iter all definitions in the order they appear, + going into the included files. + """ + for source_location, parsed in items: + seen.add(source_location) + for parsed_statement in parsed.parsed_source: + if isinstance(parsed_statement, IncludeStatement): + location = parsed.location, parsed_statement.target + if location in seen and include_only_once: + raise ValueError(f"{location} was already included.") + yield from self._iter_statements( + ((location, self[location]),), seen, include_only_once + ) + else: + yield parsed_statement + + def iter_statements(self, include_only_once=True): + """Iter all definitions in the order they appear, + going into the included files. + + Parameters + ---------- + include_only_once + if true, each file cannot be included more than once. + """ + yield from self._iter_statements([(None, self[None])], set(), include_only_once) + + def _iter_blocks(self, items, seen, include_only_once): + """Iter all definitions in the order they appear, + going into the included files. + """ + for source_location, parsed in items: + seen.add(source_location) + for parsed_statement in parsed.parsed_source.iter_blocks(): + if isinstance(parsed_statement, IncludeStatement): + location = parsed.location, parsed_statement.target + if location in seen and include_only_once: + raise ValueError(f"{location} was already included.") + yield from self._iter_blocks( + ((location, self[location]),), seen, include_only_once + ) + else: + yield parsed_statement + + def iter_blocks(self, include_only_once=True): + """Iter all definitions in the order they appear, + going into the included files. + + Parameters + ---------- + include_only_once + if true, each file cannot be included more than once. + """ + yield from self._iter_blocks([(None, self[None])], set(), include_only_once) + + +def default_locator(source_location: StrictLocationT, target: str) -> StrictLocationT: + """Return a new location from current_location and target.""" + + if isinstance(source_location, pathlib.Path): + current_location = pathlib.Path(source_location).resolve() + + if current_location.is_file(): + current_path = current_location.parent + else: + current_path = current_location + + target_path = pathlib.Path(target) + if target_path.is_absolute(): + raise ValueError( + f"Cannot refer to absolute paths in import statements ({source_location}, {target})." + ) + + tmp = (current_path / target_path).resolve() + if not is_relative_to(tmp, current_path): + raise ValueError( + f"Cannot refer to locations above the current location ({source_location}, {target})" + ) + + return tmp.absolute() + + elif isinstance(source_location, tuple) and len(source_location) == 2: + return source_location[0], target + + raise TypeError( + f"Cannot handle type {type(source_location)}, " + "use str or pathlib.Path for files or " + "(package: str, resource_name: str) tuple " + "for a resource." + ) + + +DefinitionT = ty.Union[ty.Type[Block], ty.Type[ParsedStatement]] + +SpecT = ty.Union[ + ty.Type[Parser], + DefinitionT, + ty.Iterable[DefinitionT], + ty.Type[RootBlock], +] + + +def build_parser_class(spec: SpecT, *, strip_spaces: bool = True, delimiters=None): + """Build a custom parser class. + + Parameters + ---------- + spec + specification of the content to parse. Can be one of the following things: + - Parser class. + - Block or ParsedStatement derived class. + - Iterable of Block or ParsedStatement derived class. + - RootBlock derived class. + strip_spaces : bool + if True, spaces will be stripped for each statement before calling + ``from_string_and_config``. + delimiters : dict + Specify how the source file is split into statements (See below). + + Delimiters dictionary + --------------------- + The delimiters are specified with the keys of the delimiters dict. + The dict files can be used to further customize the iterator. Each + consist of a tuple of two elements: + 1. A value of the DelimiterMode to indicate what to do with the + delimiter string: skip it, attach keep it with previous or next string + 2. A boolean indicating if parsing should stop after fiSBT + encountering this delimiter. + """ + + if delimiters is None: + delimiters = SPLIT_EOL + + if isinstance(spec, type) and issubclass(spec, Parser): + CustomParser = spec + else: + if isinstance(spec, (tuple, list)): + + for el in spec: + if not issubclass(el, (Block, ParsedStatement)): + raise TypeError( + "Elements in root_block_class must be of type Block or ParsedStatement, " + f"not {el}" + ) + + @dataclass(frozen=True) + class CustomRootBlock(RootBlock): + pass + + CustomRootBlock.__annotations__["body"] = Multi[ty.Union[spec]] + + elif isinstance(spec, type) and issubclass(spec, RootBlock): + + CustomRootBlock = spec + + elif isinstance(spec, type) and issubclass(spec, (Block, ParsedStatement)): + + @dataclass(frozen=True) + class CustomRootBlock(RootBlock): + pass + + CustomRootBlock.__annotations__["body"] = Multi[spec] + + else: + raise TypeError( + "`spec` must be of type RootBlock or tuple of type Block or ParsedStatement, " + f"not {type(spec)}" + ) + + class CustomParser(Parser): + + _delimiters = delimiters + _root_block_class = CustomRootBlock + _strip_spaces = strip_spaces + + return CustomParser + + +def parse( + entry_point: SourceLocationT, + spec: SpecT, + config=None, + *, + strip_spaces: bool = True, + delimiters=None, + locator: ty.Callable[[StrictLocationT, str], StrictLocationT] = default_locator, + prefer_resource_as_file: bool = True, + **extra_parser_kwargs, +) -> ParsedProject: + """Parse sources into a ParsedProject dictionary. + + Parameters + ---------- + entry_point + file or resource, given as (package_name, resource_name). + spec + specification of the content to parse. Can be one of the following things: + - Parser class. + - Block or ParsedStatement derived class. + - Iterable of Block or ParsedStatement derived class. + - RootBlock derived class. + config + a configuration object that will be passed to `from_string_and_config` + classmethod. + strip_spaces : bool + if True, spaces will be stripped for each statement before calling + ``from_string_and_config``. + delimiters : dict + Specify how the source file is split into statements (See below). + locator : Callable + function that takes the current location and a target of an IncludeStatement + and returns a new location. + prefer_resource_as_file : bool + if True, resources will try to be located in the filesystem if + available. + extra_parser_kwargs + extra keyword arguments to be given to the parser. + + Delimiters dictionary + --------------------- + The delimiters are specified with the keys of the delimiters dict. + The dict files can be used to further customize the iterator. Each + consist of a tuple of two elements: + 1. A value of the DelimiterMode to indicate what to do with the + delimiter string: skip it, attach keep it with previous or next string + 2. A boolean indicating if parsing should stop after fiSBT + encountering this delimiter. + """ + + CustomParser = build_parser_class( + spec, strip_spaces=strip_spaces, delimiters=delimiters + ) + parser = CustomParser( + config, prefer_resource_as_file=prefer_resource_as_file, **extra_parser_kwargs + ) + + pp = ParsedProject() + + # : ty.List[Optional[ty.Union[LocatorT, str]], ...] + pending: ty.List[ty.Tuple[StrictLocationT, str]] = [] + if isinstance(entry_point, (str, pathlib.Path)): + entry_point = pathlib.Path(entry_point) + if not entry_point.is_absolute(): + entry_point = pathlib.Path.cwd() / entry_point + + elif not (isinstance(entry_point, tuple) and len(entry_point) == 2): + raise TypeError( + f"Cannot handle type {type(entry_point)}, " + "use str or pathlib.Path for files or " + "(package: str, resource_name: str) tuple " + "for a resource." + ) + + pp[None] = parsed = parser.parse(entry_point) + pending.extend( + (parsed.location, el.target) + for el in parsed.parsed_source.filter_by(IncludeStatement) + ) + + while pending: + source_location, target = pending.pop(0) + pp[(source_location, target)] = parsed = parser.parse( + locator(source_location, target) + ) + pending.extend( + (parsed.location, el.target) + for el in parsed.parsed_source.filter_by(IncludeStatement) + ) + + return pp + + +def parse_bytes( + content: bytes, + spec: SpecT, + config=None, + *, + strip_spaces: bool = True, + delimiters=None, + **extra_parser_kwargs, +) -> ParsedProject: + """Parse sources into a ParsedProject dictionary. + + Parameters + ---------- + content + bytes. + spec + specification of the content to parse. Can be one of the following things: + - Parser class. + - Block or ParsedStatement derived class. + - Iterable of Block or ParsedStatement derived class. + - RootBlock derived class. + config + a configuration object that will be passed to `from_string_and_config` + classmethod. + strip_spaces : bool + if True, spaces will be stripped for each statement before calling + ``from_string_and_config``. + delimiters : dict + Specify how the source file is split into statements (See below). + """ + + CustomParser = build_parser_class( + spec, strip_spaces=strip_spaces, delimiters=delimiters + ) + parser = CustomParser(config, prefer_resource_as_file=False, **extra_parser_kwargs) + + pp = ParsedProject() + + pp[None] = parsed = parser.parse_bytes(content) + + if any(parsed.parsed_source.filter_by(IncludeStatement)): + raise ValueError("parse_bytes does not support using an IncludeStatement") + + return pp |