diff options
author | Hernan <hernan.grecco@gmail.com> | 2022-03-01 20:51:17 -0300 |
---|---|---|
committer | Hernan <hernan.grecco@gmail.com> | 2022-05-07 18:32:58 -0300 |
commit | fe48c3735ae53ef8d6b3ed59d8321f34f41dc46d (patch) | |
tree | 10bb9accd85e1a57b6fa812eeec814c985e0dd96 /pint/registry.py | |
parent | c2fef2e442ee0db0efed19295508eaba81d0a299 (diff) | |
download | pint-fe48c3735ae53ef8d6b3ed59d8321f34f41dc46d.tar.gz |
Create Context Facet
Diffstat (limited to 'pint/registry.py')
-rw-r--r-- | pint/registry.py | 602 |
1 files changed, 4 insertions, 598 deletions
diff --git a/pint/registry.py b/pint/registry.py index 331f910..414fc6a 100644 --- a/pint/registry.py +++ b/pint/registry.py @@ -41,8 +41,7 @@ import itertools import locale import pathlib import re -from collections import ChainMap, defaultdict -from contextlib import contextmanager +from collections import defaultdict from dataclasses import dataclass from decimal import Decimal from fractions import Fraction @@ -52,7 +51,6 @@ from typing import ( TYPE_CHECKING, Any, Callable, - ContextManager, Dict, FrozenSet, Iterable, @@ -66,11 +64,10 @@ from typing import ( Union, ) -from . import parser, registry_helpers -from ._typing import F, QuantityOrUnitLike +from . import Context, parser, registry_helpers +from ._typing import QuantityOrUnitLike from ._vendor import appdirs from .compat import HAS_BABEL, babel_parse, tokenizer -from .context import Context, ContextChain, ContextDefinition from .converters import ScaleConverter from .definitions import ( AliasDefinition, @@ -85,6 +82,7 @@ from .errors import ( RedefinitionError, UndefinedUnitError, ) +from .facets.context import ContextRegistry from .facets.system import SystemRegistry from .pint_eval import build_eval_tree from .util import ( @@ -92,8 +90,6 @@ from .util import ( SourceIterator, UnitsContainer, _is_dim, - find_connected_nodes, - find_shortest_path, getattr_maybe_raise, logger, pi_theorem, @@ -187,18 +183,6 @@ class RegistryCache: return all(getattr(self, attr) == getattr(other, attr) for attr in attrs) -class ContextCacheOverlay: - """Layer on top of the base UnitRegistry cache, specific to a combination of - active contexts which contain unit redefinitions. - """ - - def __init__(self, registry_cache: RegistryCache) -> None: - self.dimensional_equivalents = registry_cache.dimensional_equivalents - self.root_units = {} - self.dimensionality = registry_cache.dimensionality - self.parse_unit = registry_cache.parse_unit - - NON_INT_TYPE = Type[Union[float, Decimal, Fraction]] PreprocessorType = Callable[[str], str] @@ -1598,584 +1582,6 @@ class NonMultiplicativeRegistry(BaseRegistry): return value -class ContextRegistry(BaseRegistry): - """Handle of Contexts. - - Conversion between units with different dimensions according - to previously established relations (contexts). - (e.g. in the spectroscopy, conversion between frequency and energy is possible) - - Capabilities: - - - Register contexts. - - Enable and disable contexts. - - Parse @context directive. - """ - - def __init__(self, **kwargs: Any) -> None: - # Map context name (string) or abbreviation to context. - self._contexts: Dict[str, Context] = {} - # Stores active contexts. - self._active_ctx = ContextChain() - # Map context chain to cache - self._caches = {} - # Map context chain to units override - self._context_units = {} - - super().__init__(**kwargs) - - # Allow contexts to add override layers to the units - self._units = ChainMap(self._units) - - def _register_directives(self) -> None: - super()._register_directives() - self._register_directive("@context", self._load_context, ContextDefinition) - - def _load_context(self, cd: ContextDefinition) -> None: - try: - self.add_context(Context.from_definition(cd, self.get_dimensionality)) - except KeyError as e: - raise DefinitionSyntaxError(f"unknown dimension {e} in context") - - def add_context(self, context: Context) -> None: - """Add a context object to the registry. - - The context will be accessible by its name and aliases. - - Notice that this method will NOT enable the context; - see :meth:`enable_contexts`. - """ - if not context.name: - raise ValueError("Can't add unnamed context to registry") - if context.name in self._contexts: - logger.warning( - "The name %s was already registered for another context.", context.name - ) - self._contexts[context.name] = context - for alias in context.aliases: - if alias in self._contexts: - logger.warning( - "The name %s was already registered for another context", - context.name, - ) - self._contexts[alias] = context - - def remove_context(self, name_or_alias: str) -> Context: - """Remove a context from the registry and return it. - - Notice that this methods will not disable the context; - see :meth:`disable_contexts`. - """ - context = self._contexts[name_or_alias] - - del self._contexts[context.name] - for alias in context.aliases: - del self._contexts[alias] - - return context - - def _build_cache(self, loaded_files=None) -> None: - super()._build_cache(loaded_files) - self._caches[()] = self._cache - - def _switch_context_cache_and_units(self) -> None: - """If any of the active contexts redefine units, create variant self._cache - and self._units specific to the combination of active contexts. - The next time this method is invoked with the same combination of contexts, - reuse the same variant self._cache and self._units as in the previous time. - """ - del self._units.maps[:-1] - units_overlay = any(ctx.redefinitions for ctx in self._active_ctx.contexts) - if not units_overlay: - # Use the default _cache and _units - self._cache = self._caches[()] - return - - key = self._active_ctx.hashable() - try: - self._cache = self._caches[key] - self._units.maps.insert(0, self._context_units[key]) - except KeyError: - pass - - # First time using this specific combination of contexts and it contains - # unit redefinitions - base_cache = self._caches[()] - self._caches[key] = self._cache = ContextCacheOverlay(base_cache) - - self._context_units[key] = units_overlay = {} - self._units.maps.insert(0, units_overlay) - - on_redefinition_backup = self._on_redefinition - self._on_redefinition = "ignore" - try: - for ctx in reversed(self._active_ctx.contexts): - for definition in ctx.redefinitions: - self._redefine(definition) - finally: - self._on_redefinition = on_redefinition_backup - - def _redefine(self, definition: UnitDefinition) -> None: - """Redefine a unit from a context""" - # Find original definition in the UnitRegistry - candidates = self.parse_unit_name(definition.name) - if not candidates: - raise UndefinedUnitError(definition.name) - candidates_no_prefix = [c for c in candidates if not c[0]] - if not candidates_no_prefix: - raise ValueError(f"Can't redefine a unit with a prefix: {definition.name}") - assert len(candidates_no_prefix) == 1 - _, name, _ = candidates_no_prefix[0] - try: - basedef = self._units[name] - except KeyError: - raise UndefinedUnitError(name) - - # Rebuild definition as a variant of the base - if basedef.is_base: - raise ValueError("Can't redefine a base unit to a derived one") - - dims_old = self._get_dimensionality(basedef.reference) - dims_new = self._get_dimensionality(definition.reference) - if dims_old != dims_new: - raise ValueError( - f"Can't change dimensionality of {basedef.name} " - f"from {dims_old} to {dims_new} in a context" - ) - - # Do not modify in place the original definition, as (1) the context may - # be shared by other registries, and (2) it would alter the cache key - definition = UnitDefinition( - name=basedef.name, - defined_symbol=basedef.symbol, - aliases=basedef.aliases, - is_base=False, - reference=definition.reference, - converter=definition.converter, - ) - - # Write into the context-specific self._units.maps[0] and self._cache.root_units - self.define(definition) - - def enable_contexts( - self, *names_or_contexts: Union[str, Context], **kwargs - ) -> None: - """Enable contexts provided by name or by object. - - Parameters - ---------- - *names_or_contexts : - one or more contexts or context names/aliases - **kwargs : - keyword arguments for the context(s) - - Examples - -------- - See :meth:`context` - """ - - # If present, copy the defaults from the containing contexts - if self._active_ctx.defaults: - kwargs = dict(self._active_ctx.defaults, **kwargs) - - # For each name, we first find the corresponding context - ctxs = [ - self._contexts[name] if isinstance(name, str) else name - for name in names_or_contexts - ] - - # Check if the contexts have been checked first, if not we make sure - # that dimensions are expressed in terms of base dimensions. - for ctx in ctxs: - if ctx.checked: - continue - funcs_copy = dict(ctx.funcs) - for (src, dst), func in funcs_copy.items(): - src_ = self._get_dimensionality(src) - dst_ = self._get_dimensionality(dst) - if src != src_ or dst != dst_: - ctx.remove_transformation(src, dst) - ctx.add_transformation(src_, dst_, func) - ctx.checked = True - - # and create a new one with the new defaults. - contexts = tuple(Context.from_context(ctx, **kwargs) for ctx in ctxs) - - # Finally we add them to the active context. - self._active_ctx.insert_contexts(*contexts) - self._switch_context_cache_and_units() - - def disable_contexts(self, n: int = None) -> None: - """Disable the last n enabled contexts. - - Parameters - ---------- - n : int - Number of contexts to disable. Default: disable all contexts. - """ - self._active_ctx.remove_contexts(n) - self._switch_context_cache_and_units() - - @contextmanager - def context(self, *names, **kwargs) -> ContextManager[Context]: - """Used as a context manager, this function enables to activate a context - which is removed after usage. - - Parameters - ---------- - *names : - name(s) of the context(s). - **kwargs : - keyword arguments for the contexts. - - Examples - -------- - Context can be called by their name: - - >>> import pint - >>> ureg = pint.UnitRegistry() - >>> ureg.add_context(pint.Context('one')) - >>> ureg.add_context(pint.Context('two')) - >>> with ureg.context('one'): - ... pass - - If a context has an argument, you can specify its value as a keyword argument: - - >>> with ureg.context('one', n=1): - ... pass - - Multiple contexts can be entered in single call: - - >>> with ureg.context('one', 'two', n=1): - ... pass - - Or nested allowing you to give different values to the same keyword argument: - - >>> with ureg.context('one', n=1): - ... with ureg.context('two', n=2): - ... pass - - A nested context inherits the defaults from the containing context: - - >>> with ureg.context('one', n=1): - ... # Here n takes the value of the outer context - ... with ureg.context('two'): - ... pass - """ - # Enable the contexts. - self.enable_contexts(*names, **kwargs) - - try: - # After adding the context and rebuilding the graph, the registry - # is ready to use. - yield self - finally: - # Upon leaving the with statement, - # the added contexts are removed from the active one. - self.disable_contexts(len(names)) - - def with_context(self, name, **kwargs) -> Callable[[F], F]: - """Decorator to wrap a function call in a Pint context. - - Use it to ensure that a certain context is active when - calling a function:: - - Parameters - ---------- - name : - name of the context. - **kwargs : - keyword arguments for the context - - - Returns - ------- - callable - the wrapped function. - - Example - ------- - >>> @ureg.with_context('sp') - ... def my_cool_fun(wavelength): - ... print('This wavelength is equivalent to: %s', wavelength.to('terahertz')) - """ - - def decorator(func): - assigned = tuple( - attr for attr in functools.WRAPPER_ASSIGNMENTS if hasattr(func, attr) - ) - updated = tuple( - attr for attr in functools.WRAPPER_UPDATES if hasattr(func, attr) - ) - - @functools.wraps(func, assigned=assigned, updated=updated) - def wrapper(*values, **wrapper_kwargs): - with self.context(name, **kwargs): - return func(*values, **wrapper_kwargs) - - return wrapper - - return decorator - - def _convert(self, value, src, dst, inplace=False): - """Convert value from some source to destination units. - - In addition to what is done by the BaseRegistry, - converts between units with different dimensions by following - transformation rules defined in the context. - - Parameters - ---------- - value : - value - src : UnitsContainer - source units. - dst : UnitsContainer - destination units. - inplace : - (Default value = False) - - Returns - ------- - callable - converted value - """ - # If there is an active context, we look for a path connecting source and - # destination dimensionality. If it exists, we transform the source value - # by applying sequentially each transformation of the path. - if self._active_ctx: - - src_dim = self._get_dimensionality(src) - dst_dim = self._get_dimensionality(dst) - - path = find_shortest_path(self._active_ctx.graph, src_dim, dst_dim) - if path: - src = self.Quantity(value, src) - for a, b in zip(path[:-1], path[1:]): - src = self._active_ctx.transform(a, b, self, src) - - value, src = src._magnitude, src._units - - return super()._convert(value, src, dst, inplace) - - def _get_compatible_units(self, input_units, group_or_system): - src_dim = self._get_dimensionality(input_units) - - ret = super()._get_compatible_units(input_units, group_or_system) - - if self._active_ctx: - ret = ret.copy() # Do not alter self._cache - nodes = find_connected_nodes(self._active_ctx.graph, src_dim) - if nodes: - for node in nodes: - ret |= self._cache.dimensional_equivalents[node] - - return ret - - -from .facets.group import GroupRegistry - - -class SystemRegistry(GroupRegistry): - """Handle of Systems. - - Conversion between units with different dimensions according - to previously established relations (contexts). - (e.g. in the spectroscopy, conversion between frequency and energy is possible) - - Capabilities: - - - Register systems. - - List systems - - Get or get the default system. - - Parse @group directive. - """ - - def __init__(self, system=None, **kwargs): - super().__init__(**kwargs) - - #: Map system name to system. - #: :type: dict[ str | System] - self._systems: Dict[str, System] = {} - - #: Maps dimensionality (UnitsContainer) to Dimensionality (UnitsContainer) - self._base_units_cache = dict() - - self._default_system = system - - def _init_dynamic_classes(self) -> None: - super()._init_dynamic_classes() - self.System = systems.build_system_class(self) - - def _after_init(self) -> None: - """Invoked at the end of ``__init__``. - - - Create default group and add all orphan units to it - - Set default system - """ - super()._after_init() - - #: System name to be used by default. - self._default_system = self._default_system or self._defaults.get( - "system", None - ) - - def _register_directives(self) -> None: - super()._register_directives() - self._register_directive( - "@system", - lambda gd: self.System.from_definition(gd, self.get_root_units), - SystemDefinition, - ) - - @property - def sys(self): - return systems.Lister(self._systems) - - @property - def default_system(self) -> System: - return self._default_system - - @default_system.setter - def default_system(self, name): - if name: - if name not in self._systems: - raise ValueError("Unknown system %s" % name) - - self._base_units_cache = {} - - self._default_system = name - - def get_system(self, name: str, create_if_needed: bool = True) -> System: - """Return a Group. - - Parameters - ---------- - name : str - Name of the group to be - create_if_needed : bool - If True, create a group if not found. If False, raise an Exception. - (Default value = True) - - Returns - ------- - type - System - - """ - if name in self._systems: - return self._systems[name] - - if not create_if_needed: - raise ValueError("Unknown system %s" % name) - - return self.System(name) - - def get_base_units( - self, - input_units: Union[UnitLike, Quantity], - check_nonmult: bool = True, - system: Union[str, System, None] = None, - ) -> Tuple[Number, Unit]: - """Convert unit or dict of units to the base units. - - If any unit is non multiplicative and check_converter is True, - then None is returned as the multiplicative factor. - - Unlike BaseRegistry, in this registry root_units might be different - from base_units - - Parameters - ---------- - input_units : UnitsContainer or str - units - check_nonmult : bool - if True, None will be returned as the - multiplicative factor if a non-multiplicative - units is found in the final Units. (Default value = True) - system : - (Default value = None) - - Returns - ------- - type - multiplicative factor, base units - - """ - - input_units = to_units_container(input_units) - - f, units = self._get_base_units(input_units, check_nonmult, system) - - return f, self.Unit(units) - - def _get_base_units( - self, - input_units: UnitsContainerT, - check_nonmult: bool = True, - system: Union[str, System, None] = None, - ): - - if system is None: - system = self._default_system - - # The cache is only done for check_nonmult=True and the current system. - if ( - check_nonmult - and system == self._default_system - and input_units in self._base_units_cache - ): - return self._base_units_cache[input_units] - - factor, units = self.get_root_units(input_units, check_nonmult) - - if not system: - return factor, units - - # This will not be necessary after integration with the registry - # as it has a UnitsContainer intermediate - units = to_units_container(units, self) - - destination_units = self.UnitsContainer() - - bu = self.get_system(system, False).base_units - - for unit, value in units.items(): - if unit in bu: - new_unit = bu[unit] - new_unit = to_units_container(new_unit, self) - destination_units *= new_unit ** value - else: - destination_units *= self.UnitsContainer({unit: value}) - - base_factor = self.convert(factor, units, destination_units) - - if check_nonmult: - self._base_units_cache[input_units] = base_factor, destination_units - - return base_factor, destination_units - - def _get_compatible_units(self, input_units, group_or_system) -> FrozenSet[Unit]: - - if group_or_system is None: - group_or_system = self._default_system - - if group_or_system and group_or_system in self._systems: - members = self._systems[group_or_system].members - # group_or_system has been handled by System - return frozenset(members & super()._get_compatible_units(input_units)) - - try: - return super()._get_compatible_units(input_units, group_or_system) - except ValueError as ex: - # It might be also a system - if "Unknown Group" in str(ex): - raise ValueError( - "Unknown Group o System with name '%s'" % group_or_system - ) from ex - raise ex - - class UnitRegistry(SystemRegistry, ContextRegistry, NonMultiplicativeRegistry): """The unit registry stores the definitions and relationships between units. |