summaryrefslogtreecommitdiff
path: root/pint/registry.py
diff options
context:
space:
mode:
authorHernan <hernan.grecco@gmail.com>2022-03-01 20:51:17 -0300
committerHernan <hernan.grecco@gmail.com>2022-05-07 18:32:58 -0300
commitfe48c3735ae53ef8d6b3ed59d8321f34f41dc46d (patch)
tree10bb9accd85e1a57b6fa812eeec814c985e0dd96 /pint/registry.py
parentc2fef2e442ee0db0efed19295508eaba81d0a299 (diff)
downloadpint-fe48c3735ae53ef8d6b3ed59d8321f34f41dc46d.tar.gz
Create Context Facet
Diffstat (limited to 'pint/registry.py')
-rw-r--r--pint/registry.py602
1 files changed, 4 insertions, 598 deletions
diff --git a/pint/registry.py b/pint/registry.py
index 331f910..414fc6a 100644
--- a/pint/registry.py
+++ b/pint/registry.py
@@ -41,8 +41,7 @@ import itertools
import locale
import pathlib
import re
-from collections import ChainMap, defaultdict
-from contextlib import contextmanager
+from collections import defaultdict
from dataclasses import dataclass
from decimal import Decimal
from fractions import Fraction
@@ -52,7 +51,6 @@ from typing import (
TYPE_CHECKING,
Any,
Callable,
- ContextManager,
Dict,
FrozenSet,
Iterable,
@@ -66,11 +64,10 @@ from typing import (
Union,
)
-from . import parser, registry_helpers
-from ._typing import F, QuantityOrUnitLike
+from . import Context, parser, registry_helpers
+from ._typing import QuantityOrUnitLike
from ._vendor import appdirs
from .compat import HAS_BABEL, babel_parse, tokenizer
-from .context import Context, ContextChain, ContextDefinition
from .converters import ScaleConverter
from .definitions import (
AliasDefinition,
@@ -85,6 +82,7 @@ from .errors import (
RedefinitionError,
UndefinedUnitError,
)
+from .facets.context import ContextRegistry
from .facets.system import SystemRegistry
from .pint_eval import build_eval_tree
from .util import (
@@ -92,8 +90,6 @@ from .util import (
SourceIterator,
UnitsContainer,
_is_dim,
- find_connected_nodes,
- find_shortest_path,
getattr_maybe_raise,
logger,
pi_theorem,
@@ -187,18 +183,6 @@ class RegistryCache:
return all(getattr(self, attr) == getattr(other, attr) for attr in attrs)
-class ContextCacheOverlay:
- """Layer on top of the base UnitRegistry cache, specific to a combination of
- active contexts which contain unit redefinitions.
- """
-
- def __init__(self, registry_cache: RegistryCache) -> None:
- self.dimensional_equivalents = registry_cache.dimensional_equivalents
- self.root_units = {}
- self.dimensionality = registry_cache.dimensionality
- self.parse_unit = registry_cache.parse_unit
-
-
NON_INT_TYPE = Type[Union[float, Decimal, Fraction]]
PreprocessorType = Callable[[str], str]
@@ -1598,584 +1582,6 @@ class NonMultiplicativeRegistry(BaseRegistry):
return value
-class ContextRegistry(BaseRegistry):
- """Handle of Contexts.
-
- Conversion between units with different dimensions according
- to previously established relations (contexts).
- (e.g. in the spectroscopy, conversion between frequency and energy is possible)
-
- Capabilities:
-
- - Register contexts.
- - Enable and disable contexts.
- - Parse @context directive.
- """
-
- def __init__(self, **kwargs: Any) -> None:
- # Map context name (string) or abbreviation to context.
- self._contexts: Dict[str, Context] = {}
- # Stores active contexts.
- self._active_ctx = ContextChain()
- # Map context chain to cache
- self._caches = {}
- # Map context chain to units override
- self._context_units = {}
-
- super().__init__(**kwargs)
-
- # Allow contexts to add override layers to the units
- self._units = ChainMap(self._units)
-
- def _register_directives(self) -> None:
- super()._register_directives()
- self._register_directive("@context", self._load_context, ContextDefinition)
-
- def _load_context(self, cd: ContextDefinition) -> None:
- try:
- self.add_context(Context.from_definition(cd, self.get_dimensionality))
- except KeyError as e:
- raise DefinitionSyntaxError(f"unknown dimension {e} in context")
-
- def add_context(self, context: Context) -> None:
- """Add a context object to the registry.
-
- The context will be accessible by its name and aliases.
-
- Notice that this method will NOT enable the context;
- see :meth:`enable_contexts`.
- """
- if not context.name:
- raise ValueError("Can't add unnamed context to registry")
- if context.name in self._contexts:
- logger.warning(
- "The name %s was already registered for another context.", context.name
- )
- self._contexts[context.name] = context
- for alias in context.aliases:
- if alias in self._contexts:
- logger.warning(
- "The name %s was already registered for another context",
- context.name,
- )
- self._contexts[alias] = context
-
- def remove_context(self, name_or_alias: str) -> Context:
- """Remove a context from the registry and return it.
-
- Notice that this methods will not disable the context;
- see :meth:`disable_contexts`.
- """
- context = self._contexts[name_or_alias]
-
- del self._contexts[context.name]
- for alias in context.aliases:
- del self._contexts[alias]
-
- return context
-
- def _build_cache(self, loaded_files=None) -> None:
- super()._build_cache(loaded_files)
- self._caches[()] = self._cache
-
- def _switch_context_cache_and_units(self) -> None:
- """If any of the active contexts redefine units, create variant self._cache
- and self._units specific to the combination of active contexts.
- The next time this method is invoked with the same combination of contexts,
- reuse the same variant self._cache and self._units as in the previous time.
- """
- del self._units.maps[:-1]
- units_overlay = any(ctx.redefinitions for ctx in self._active_ctx.contexts)
- if not units_overlay:
- # Use the default _cache and _units
- self._cache = self._caches[()]
- return
-
- key = self._active_ctx.hashable()
- try:
- self._cache = self._caches[key]
- self._units.maps.insert(0, self._context_units[key])
- except KeyError:
- pass
-
- # First time using this specific combination of contexts and it contains
- # unit redefinitions
- base_cache = self._caches[()]
- self._caches[key] = self._cache = ContextCacheOverlay(base_cache)
-
- self._context_units[key] = units_overlay = {}
- self._units.maps.insert(0, units_overlay)
-
- on_redefinition_backup = self._on_redefinition
- self._on_redefinition = "ignore"
- try:
- for ctx in reversed(self._active_ctx.contexts):
- for definition in ctx.redefinitions:
- self._redefine(definition)
- finally:
- self._on_redefinition = on_redefinition_backup
-
- def _redefine(self, definition: UnitDefinition) -> None:
- """Redefine a unit from a context"""
- # Find original definition in the UnitRegistry
- candidates = self.parse_unit_name(definition.name)
- if not candidates:
- raise UndefinedUnitError(definition.name)
- candidates_no_prefix = [c for c in candidates if not c[0]]
- if not candidates_no_prefix:
- raise ValueError(f"Can't redefine a unit with a prefix: {definition.name}")
- assert len(candidates_no_prefix) == 1
- _, name, _ = candidates_no_prefix[0]
- try:
- basedef = self._units[name]
- except KeyError:
- raise UndefinedUnitError(name)
-
- # Rebuild definition as a variant of the base
- if basedef.is_base:
- raise ValueError("Can't redefine a base unit to a derived one")
-
- dims_old = self._get_dimensionality(basedef.reference)
- dims_new = self._get_dimensionality(definition.reference)
- if dims_old != dims_new:
- raise ValueError(
- f"Can't change dimensionality of {basedef.name} "
- f"from {dims_old} to {dims_new} in a context"
- )
-
- # Do not modify in place the original definition, as (1) the context may
- # be shared by other registries, and (2) it would alter the cache key
- definition = UnitDefinition(
- name=basedef.name,
- defined_symbol=basedef.symbol,
- aliases=basedef.aliases,
- is_base=False,
- reference=definition.reference,
- converter=definition.converter,
- )
-
- # Write into the context-specific self._units.maps[0] and self._cache.root_units
- self.define(definition)
-
- def enable_contexts(
- self, *names_or_contexts: Union[str, Context], **kwargs
- ) -> None:
- """Enable contexts provided by name or by object.
-
- Parameters
- ----------
- *names_or_contexts :
- one or more contexts or context names/aliases
- **kwargs :
- keyword arguments for the context(s)
-
- Examples
- --------
- See :meth:`context`
- """
-
- # If present, copy the defaults from the containing contexts
- if self._active_ctx.defaults:
- kwargs = dict(self._active_ctx.defaults, **kwargs)
-
- # For each name, we first find the corresponding context
- ctxs = [
- self._contexts[name] if isinstance(name, str) else name
- for name in names_or_contexts
- ]
-
- # Check if the contexts have been checked first, if not we make sure
- # that dimensions are expressed in terms of base dimensions.
- for ctx in ctxs:
- if ctx.checked:
- continue
- funcs_copy = dict(ctx.funcs)
- for (src, dst), func in funcs_copy.items():
- src_ = self._get_dimensionality(src)
- dst_ = self._get_dimensionality(dst)
- if src != src_ or dst != dst_:
- ctx.remove_transformation(src, dst)
- ctx.add_transformation(src_, dst_, func)
- ctx.checked = True
-
- # and create a new one with the new defaults.
- contexts = tuple(Context.from_context(ctx, **kwargs) for ctx in ctxs)
-
- # Finally we add them to the active context.
- self._active_ctx.insert_contexts(*contexts)
- self._switch_context_cache_and_units()
-
- def disable_contexts(self, n: int = None) -> None:
- """Disable the last n enabled contexts.
-
- Parameters
- ----------
- n : int
- Number of contexts to disable. Default: disable all contexts.
- """
- self._active_ctx.remove_contexts(n)
- self._switch_context_cache_and_units()
-
- @contextmanager
- def context(self, *names, **kwargs) -> ContextManager[Context]:
- """Used as a context manager, this function enables to activate a context
- which is removed after usage.
-
- Parameters
- ----------
- *names :
- name(s) of the context(s).
- **kwargs :
- keyword arguments for the contexts.
-
- Examples
- --------
- Context can be called by their name:
-
- >>> import pint
- >>> ureg = pint.UnitRegistry()
- >>> ureg.add_context(pint.Context('one'))
- >>> ureg.add_context(pint.Context('two'))
- >>> with ureg.context('one'):
- ... pass
-
- If a context has an argument, you can specify its value as a keyword argument:
-
- >>> with ureg.context('one', n=1):
- ... pass
-
- Multiple contexts can be entered in single call:
-
- >>> with ureg.context('one', 'two', n=1):
- ... pass
-
- Or nested allowing you to give different values to the same keyword argument:
-
- >>> with ureg.context('one', n=1):
- ... with ureg.context('two', n=2):
- ... pass
-
- A nested context inherits the defaults from the containing context:
-
- >>> with ureg.context('one', n=1):
- ... # Here n takes the value of the outer context
- ... with ureg.context('two'):
- ... pass
- """
- # Enable the contexts.
- self.enable_contexts(*names, **kwargs)
-
- try:
- # After adding the context and rebuilding the graph, the registry
- # is ready to use.
- yield self
- finally:
- # Upon leaving the with statement,
- # the added contexts are removed from the active one.
- self.disable_contexts(len(names))
-
- def with_context(self, name, **kwargs) -> Callable[[F], F]:
- """Decorator to wrap a function call in a Pint context.
-
- Use it to ensure that a certain context is active when
- calling a function::
-
- Parameters
- ----------
- name :
- name of the context.
- **kwargs :
- keyword arguments for the context
-
-
- Returns
- -------
- callable
- the wrapped function.
-
- Example
- -------
- >>> @ureg.with_context('sp')
- ... def my_cool_fun(wavelength):
- ... print('This wavelength is equivalent to: %s', wavelength.to('terahertz'))
- """
-
- def decorator(func):
- assigned = tuple(
- attr for attr in functools.WRAPPER_ASSIGNMENTS if hasattr(func, attr)
- )
- updated = tuple(
- attr for attr in functools.WRAPPER_UPDATES if hasattr(func, attr)
- )
-
- @functools.wraps(func, assigned=assigned, updated=updated)
- def wrapper(*values, **wrapper_kwargs):
- with self.context(name, **kwargs):
- return func(*values, **wrapper_kwargs)
-
- return wrapper
-
- return decorator
-
- def _convert(self, value, src, dst, inplace=False):
- """Convert value from some source to destination units.
-
- In addition to what is done by the BaseRegistry,
- converts between units with different dimensions by following
- transformation rules defined in the context.
-
- Parameters
- ----------
- value :
- value
- src : UnitsContainer
- source units.
- dst : UnitsContainer
- destination units.
- inplace :
- (Default value = False)
-
- Returns
- -------
- callable
- converted value
- """
- # If there is an active context, we look for a path connecting source and
- # destination dimensionality. If it exists, we transform the source value
- # by applying sequentially each transformation of the path.
- if self._active_ctx:
-
- src_dim = self._get_dimensionality(src)
- dst_dim = self._get_dimensionality(dst)
-
- path = find_shortest_path(self._active_ctx.graph, src_dim, dst_dim)
- if path:
- src = self.Quantity(value, src)
- for a, b in zip(path[:-1], path[1:]):
- src = self._active_ctx.transform(a, b, self, src)
-
- value, src = src._magnitude, src._units
-
- return super()._convert(value, src, dst, inplace)
-
- def _get_compatible_units(self, input_units, group_or_system):
- src_dim = self._get_dimensionality(input_units)
-
- ret = super()._get_compatible_units(input_units, group_or_system)
-
- if self._active_ctx:
- ret = ret.copy() # Do not alter self._cache
- nodes = find_connected_nodes(self._active_ctx.graph, src_dim)
- if nodes:
- for node in nodes:
- ret |= self._cache.dimensional_equivalents[node]
-
- return ret
-
-
-from .facets.group import GroupRegistry
-
-
-class SystemRegistry(GroupRegistry):
- """Handle of Systems.
-
- Conversion between units with different dimensions according
- to previously established relations (contexts).
- (e.g. in the spectroscopy, conversion between frequency and energy is possible)
-
- Capabilities:
-
- - Register systems.
- - List systems
- - Get or get the default system.
- - Parse @group directive.
- """
-
- def __init__(self, system=None, **kwargs):
- super().__init__(**kwargs)
-
- #: Map system name to system.
- #: :type: dict[ str | System]
- self._systems: Dict[str, System] = {}
-
- #: Maps dimensionality (UnitsContainer) to Dimensionality (UnitsContainer)
- self._base_units_cache = dict()
-
- self._default_system = system
-
- def _init_dynamic_classes(self) -> None:
- super()._init_dynamic_classes()
- self.System = systems.build_system_class(self)
-
- def _after_init(self) -> None:
- """Invoked at the end of ``__init__``.
-
- - Create default group and add all orphan units to it
- - Set default system
- """
- super()._after_init()
-
- #: System name to be used by default.
- self._default_system = self._default_system or self._defaults.get(
- "system", None
- )
-
- def _register_directives(self) -> None:
- super()._register_directives()
- self._register_directive(
- "@system",
- lambda gd: self.System.from_definition(gd, self.get_root_units),
- SystemDefinition,
- )
-
- @property
- def sys(self):
- return systems.Lister(self._systems)
-
- @property
- def default_system(self) -> System:
- return self._default_system
-
- @default_system.setter
- def default_system(self, name):
- if name:
- if name not in self._systems:
- raise ValueError("Unknown system %s" % name)
-
- self._base_units_cache = {}
-
- self._default_system = name
-
- def get_system(self, name: str, create_if_needed: bool = True) -> System:
- """Return a Group.
-
- Parameters
- ----------
- name : str
- Name of the group to be
- create_if_needed : bool
- If True, create a group if not found. If False, raise an Exception.
- (Default value = True)
-
- Returns
- -------
- type
- System
-
- """
- if name in self._systems:
- return self._systems[name]
-
- if not create_if_needed:
- raise ValueError("Unknown system %s" % name)
-
- return self.System(name)
-
- def get_base_units(
- self,
- input_units: Union[UnitLike, Quantity],
- check_nonmult: bool = True,
- system: Union[str, System, None] = None,
- ) -> Tuple[Number, Unit]:
- """Convert unit or dict of units to the base units.
-
- If any unit is non multiplicative and check_converter is True,
- then None is returned as the multiplicative factor.
-
- Unlike BaseRegistry, in this registry root_units might be different
- from base_units
-
- Parameters
- ----------
- input_units : UnitsContainer or str
- units
- check_nonmult : bool
- if True, None will be returned as the
- multiplicative factor if a non-multiplicative
- units is found in the final Units. (Default value = True)
- system :
- (Default value = None)
-
- Returns
- -------
- type
- multiplicative factor, base units
-
- """
-
- input_units = to_units_container(input_units)
-
- f, units = self._get_base_units(input_units, check_nonmult, system)
-
- return f, self.Unit(units)
-
- def _get_base_units(
- self,
- input_units: UnitsContainerT,
- check_nonmult: bool = True,
- system: Union[str, System, None] = None,
- ):
-
- if system is None:
- system = self._default_system
-
- # The cache is only done for check_nonmult=True and the current system.
- if (
- check_nonmult
- and system == self._default_system
- and input_units in self._base_units_cache
- ):
- return self._base_units_cache[input_units]
-
- factor, units = self.get_root_units(input_units, check_nonmult)
-
- if not system:
- return factor, units
-
- # This will not be necessary after integration with the registry
- # as it has a UnitsContainer intermediate
- units = to_units_container(units, self)
-
- destination_units = self.UnitsContainer()
-
- bu = self.get_system(system, False).base_units
-
- for unit, value in units.items():
- if unit in bu:
- new_unit = bu[unit]
- new_unit = to_units_container(new_unit, self)
- destination_units *= new_unit ** value
- else:
- destination_units *= self.UnitsContainer({unit: value})
-
- base_factor = self.convert(factor, units, destination_units)
-
- if check_nonmult:
- self._base_units_cache[input_units] = base_factor, destination_units
-
- return base_factor, destination_units
-
- def _get_compatible_units(self, input_units, group_or_system) -> FrozenSet[Unit]:
-
- if group_or_system is None:
- group_or_system = self._default_system
-
- if group_or_system and group_or_system in self._systems:
- members = self._systems[group_or_system].members
- # group_or_system has been handled by System
- return frozenset(members & super()._get_compatible_units(input_units))
-
- try:
- return super()._get_compatible_units(input_units, group_or_system)
- except ValueError as ex:
- # It might be also a system
- if "Unknown Group" in str(ex):
- raise ValueError(
- "Unknown Group o System with name '%s'" % group_or_system
- ) from ex
- raise ex
-
-
class UnitRegistry(SystemRegistry, ContextRegistry, NonMultiplicativeRegistry):
"""The unit registry stores the definitions and relationships between units.