summaryrefslogtreecommitdiff
path: root/pint/parser.py
blob: e73e57889ac2dc30e76004bb31910bc3afa9f85e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
"""
    pint.parser
    ~~~~~~~~~~~

    Classes and methods to parse a definition text file into a DefinitionFile.

    :copyright: 2019 by Pint Authors, see AUTHORS for more details.
    :license: BSD, see LICENSE for more details.
"""

from __future__ import annotations

import pathlib
import re
from dataclasses import dataclass, field
from functools import cached_property
from importlib import resources
from io import StringIO
from typing import Any, Callable, Dict, Generator, Iterable, Optional, Tuple

from ._vendor import flexcache as fc
from .definitions import Definition
from .errors import DefinitionSyntaxError
from .util import SourceIterator, logger

_BLOCK_RE = re.compile(r"[ (]")

ParserFuncT = Callable[[SourceIterator, type], Any]


@dataclass(frozen=True)
class DefinitionFile:
    """Represents a definition file after parsing."""

    # Fullpath of the original file, None if a text was provided
    filename: Optional[pathlib.Path]
    is_resource: bool

    # Modification time of the file or None.
    mtime: Optional[float]

    # SHA-1 hash
    content_hash: Optional[str]

    # collection of line number and corresponding definition.
    parsed_lines: Tuple[Tuple[int, Any], ...]

    def filter_by(self, *klass):
        yield from (
            (lineno, d) for lineno, d in self.parsed_lines if isinstance(d, klass)
        )

    @cached_property
    def errors(self):
        return tuple(self.filter_by(Exception))

    def has_errors(self):
        return bool(self.errors)


class DefinitionFiles(tuple):
    """Wrapper class that allows handling a tuple containing DefinitionFile."""

    @staticmethod
    def _iter_definitions(
        pending_files: list[DefinitionFile],
    ) -> Generator[Tuple[int, Definition]]:
        """Internal method to iterate definitions.

        pending_files is a mutable list of definitions files
        and elements are being removed as they are yielded.
        """
        if not pending_files:
            return
        current_file = pending_files.pop(0)
        for lineno, definition in current_file.parsed_lines:
            if isinstance(definition, ImportDefinition):
                if not pending_files:
                    raise ValueError(
                        f"No more files while trying to import {definition.path}."
                    )

                if not str(pending_files[0].filename).endswith(str(definition.path)):
                    raise ValueError(
                        "The order of the files do not match. "
                        f"(expected: {definition.path}, "
                        f"found {pending_files[0].filename})"
                    )

                yield from DefinitionFiles._iter_definitions(pending_files)
            else:
                yield lineno, definition

    def iter_definitions(self):
        """Iter all definitions in the order they appear,
        going into the included files.

        Important: This assumes that the order of the imported files
        is the one that they will appear in the definitions.
        """
        yield from self._iter_definitions(list(self))


def build_disk_cache_class(non_int_type: type):
    """Build disk cache class, taking into account the non_int_type."""

    @dataclass(frozen=True)
    class PintHeader(fc.InvalidateByExist, fc.NameByFields, fc.BasicPythonHeader):

        from . import __version__

        pint_version: str = __version__
        non_int_type: str = field(default_factory=lambda: non_int_type.__qualname__)

    class PathHeader(fc.NameByFileContent, PintHeader):
        pass

    class DefinitionFilesHeader(fc.NameByHashIter, PintHeader):
        @classmethod
        def from_definition_files(cls, dfs: DefinitionFiles, reader_id):
            return cls(tuple(df.content_hash for df in dfs), reader_id)

    class PintDiskCache(fc.DiskCache):

        _header_classes = {
            pathlib.Path: PathHeader,
            str: PathHeader.from_string,
            DefinitionFiles: DefinitionFilesHeader.from_definition_files,
        }

    return PintDiskCache


@dataclass(frozen=True)
class ImportDefinition:
    """Definition for the @import directive"""

    path: pathlib.Path

    @classmethod
    def from_string(
        cls, definition: str, non_int_type: type = float
    ) -> ImportDefinition:
        return ImportDefinition(pathlib.Path(definition[7:].strip()))


class Parser:
    """Class to parse a definition file into an intermediate object representation.

    non_int_type
        numerical type used for non integer values. (Default: float)
    raise_on_error
        if True, an exception will be raised as soon as a Definition Error it is found.
        if False, the exception will be added to the ParedDefinitionFile
    """

    #: Map context prefix to function
    _directives: Dict[str, ParserFuncT]

    _diskcache: fc.DiskCache

    handled_classes = (ImportDefinition,)

    def __init__(self, non_int_type=float, raise_on_error=True, cache_folder=None):
        self._directives = {}
        self._non_int_type = non_int_type
        self._raise_on_error = raise_on_error
        self.register_class("@import", ImportDefinition)

        if isinstance(cache_folder, (str, pathlib.Path)):
            self._diskcache = build_disk_cache_class(non_int_type)(cache_folder)
        else:
            self._diskcache = cache_folder

    def register_directive(
        self, prefix: str, parserfunc: ParserFuncT, single_line: bool
    ):
        """Register a parser for a given @ directive..

        Parameters
        ----------
        prefix
            string identifying the section (e.g. @context)
        parserfunc
            function that is able to parse a definition into a DefinitionObject
        single_line
            indicates that the directive spans in a single line, i.e. and @end is not required.
        """
        if prefix and prefix[0] == "@":
            if single_line:
                self._directives[prefix] = lambda si, non_int_type: parserfunc(
                    si.last[1], non_int_type
                )
            else:
                self._directives[prefix] = lambda si, non_int_type: parserfunc(
                    si.block_iter(), non_int_type
                )
        else:
            raise ValueError("Prefix directives must start with '@'")

    def register_class(self, prefix: str, klass):
        """Register a definition class for a directive and try to guess
        if it is a line or block directive from the signature.
        """
        if hasattr(klass, "from_string"):
            self.register_directive(prefix, klass.from_string, True)
        elif hasattr(klass, "from_lines"):
            self.register_directive(prefix, klass.from_lines, False)
        else:
            raise ValueError(
                f"While registering {prefix}, {klass} does not have `from_string` or from_lines` method"
            )

    def parse(self, file, is_resource: bool = False) -> DefinitionFiles:
        """Parse a file or resource into a collection of DefinitionFile that will
        include all other files imported.

        Parameters
        ----------
        file
            definitions or file containing definition.
        is_resource
            indicates that the file is a resource file
            and therefore should be loaded from the package.
            (Default value = False)
        """

        if is_resource:
            parsed = self.parse_single_resource(file)
        else:
            path = pathlib.Path(file)
            if self._diskcache is None:
                parsed = self.parse_single(path, None)
            else:
                parsed, content_hash = self._diskcache.load(
                    path, self.parse_single, True
                )

        out = [parsed]
        for lineno, content in parsed.filter_by(ImportDefinition):
            if parsed.is_resource:
                path = content.path
            else:
                try:
                    basedir = parsed.filename.parent
                except AttributeError:
                    basedir = pathlib.Path.cwd()
                path = basedir.joinpath(content.path)
            out.extend(self.parse(path, parsed.is_resource))
        return DefinitionFiles(out)

    def parse_single_resource(self, resource_name: str) -> DefinitionFile:
        """Parse a resource in the package into a DefinitionFile.

        Imported files will appear as ImportDefinition objects and
        will not be followed.

        This method will try to load it first as a regular file
        (with a path and mtime) to allow caching.
        If this files (i.e. the resource is not filesystem file)
        it will use python importlib.resources.read_binary
        """

        with resources.path(__package__, resource_name) as p:
            filepath = p.resolve()

        if filepath.exists():
            if self._diskcache is None:
                return self.parse_single(filepath, None)
            else:
                definition_file, content_hash = self._diskcache.load(
                    filepath, self.parse_single, True
                )
                return definition_file

        logger.debug("Cannot use_cache resource (yet) without a real path")
        return self._parse_single_resource(resource_name)

    def _parse_single_resource(self, resource_name: str) -> DefinitionFile:
        rbytes = resources.read_binary(__package__, resource_name)
        if self._diskcache:
            hdr = self._diskcache.PathHeader(rbytes)
            content_hash = self._diskcache.cache_stem_for(hdr)
        else:
            content_hash = None

        si = SourceIterator(
            StringIO(rbytes.decode("utf-8")), resource_name, is_resource=True
        )
        parsed_lines = tuple(self.yield_from_source_iterator(si))
        return DefinitionFile(
            filename=pathlib.Path(resource_name),
            is_resource=True,
            mtime=None,
            content_hash=content_hash,
            parsed_lines=parsed_lines,
        )

    def parse_single(
        self, filepath: pathlib.Path, content_hash: Optional[str]
    ) -> DefinitionFile:
        """Parse a filepath without nesting into dependent files.

        Imported files will appear as ImportDefinition objects and
        will not be followed.

        Parameters
        ----------
        filepath
            definitions or file containing definition.
        """
        with filepath.open(encoding="utf-8") as fp:
            si = SourceIterator(fp, filepath, is_resource=False)
            parsed_lines = tuple(self.yield_from_source_iterator(si))

        filename = filepath.resolve()
        mtime = filepath.stat().st_mtime

        return DefinitionFile(
            filename=filename,
            is_resource=False,
            mtime=mtime,
            content_hash=content_hash,
            parsed_lines=parsed_lines,
        )

    def parse_lines(self, lines: Iterable[str]) -> DefinitionFile:
        """Parse an iterable of strings into a dependent file"""
        si = SourceIterator(lines, None, False)
        parsed_lines = tuple(self.yield_from_source_iterator(si))
        df = DefinitionFile(None, False, None, "", parsed_lines=parsed_lines)
        if any(df.filter_by(ImportDefinition)):
            raise ValueError(
                "Cannot use the @import directive when parsing "
                "an iterable of strings."
            )
        return df

    def yield_from_source_iterator(
        self, source_iterator: SourceIterator
    ) -> Generator[Tuple[int, Any]]:
        """Iterates through the source iterator, yields line numbers and
        the coresponding parsed definition object.

        Parameters
        ----------
        source_iterator
        """
        for lineno, line in source_iterator:
            try:
                if line.startswith("@"):
                    # Handle @ directives dispatching to the appropriate parsers
                    parts = _BLOCK_RE.split(line)

                    subparser = self._directives.get(parts[0], None)

                    if subparser is None:
                        raise DefinitionSyntaxError(
                            "Unknown directive %s" % line, lineno=lineno
                        )

                    d = subparser(source_iterator, self._non_int_type)
                    yield lineno, d
                else:
                    yield lineno, Definition.from_string(line, self._non_int_type)
            except DefinitionSyntaxError as ex:
                if ex.lineno is None:
                    ex.lineno = lineno
                if self._raise_on_error:
                    raise ex
                yield lineno, ex
            except Exception as ex:
                logger.error("In line {}, cannot add '{}' {}".format(lineno, line, ex))
                raise ex