summaryrefslogtreecommitdiff
path: root/kombu/utils
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/utils')
-rw-r--r--kombu/utils/__init__.py2
-rw-r--r--kombu/utils/amq_manager.py3
-rw-r--r--kombu/utils/collections.py3
-rw-r--r--kombu/utils/compat.py13
-rw-r--r--kombu/utils/debug.py2
-rw-r--r--kombu/utils/div.py2
-rw-r--r--kombu/utils/encoding.py2
-rw-r--r--kombu/utils/eventio.py2
-rw-r--r--kombu/utils/functional.py2
-rw-r--r--kombu/utils/imports.py2
-rw-r--r--kombu/utils/json.py159
-rw-r--r--kombu/utils/limits.py2
-rw-r--r--kombu/utils/objects.py2
-rw-r--r--kombu/utils/scheduling.py2
-rw-r--r--kombu/utils/text.py26
-rw-r--r--kombu/utils/time.py6
-rw-r--r--kombu/utils/url.py2
-rw-r--r--kombu/utils/uuid.py6
18 files changed, 159 insertions, 79 deletions
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 304e2dfa..94bb3cdf 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -1,5 +1,7 @@
"""DEPRECATED - Import from modules below."""
+from __future__ import annotations
+
from .collections import EqualityDict
from .compat import fileno, maybe_fileno, nested, register_after_fork
from .div import emergency_dump_state
diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py
index 7491bb25..f3e429fd 100644
--- a/kombu/utils/amq_manager.py
+++ b/kombu/utils/amq_manager.py
@@ -1,6 +1,9 @@
"""AMQP Management API utilities."""
+from __future__ import annotations
+
+
def get_manager(client, hostname=None, port=None, userid=None,
password=None):
"""Get pyrabbit manager."""
diff --git a/kombu/utils/collections.py b/kombu/utils/collections.py
index 77781047..1a0a6d0d 100644
--- a/kombu/utils/collections.py
+++ b/kombu/utils/collections.py
@@ -1,6 +1,9 @@
"""Custom maps, sequences, etc."""
+from __future__ import annotations
+
+
class HashedSeq(list):
"""Hashed Sequence.
diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py
index ffc224c1..e1b22f66 100644
--- a/kombu/utils/compat.py
+++ b/kombu/utils/compat.py
@@ -1,5 +1,7 @@
"""Python Compatibility Utilities."""
+from __future__ import annotations
+
import numbers
import sys
from contextlib import contextmanager
@@ -77,9 +79,18 @@ def detect_environment():
def entrypoints(namespace):
"""Return setuptools entrypoints for namespace."""
+ if sys.version_info >= (3,10):
+ entry_points = importlib_metadata.entry_points(group=namespace)
+ else:
+ entry_points = importlib_metadata.entry_points()
+ try:
+ entry_points = entry_points.get(namespace, [])
+ except AttributeError:
+ entry_points = entry_points.select(group=namespace)
+
return (
(ep, ep.load())
- for ep in importlib_metadata.entry_points().get(namespace, [])
+ for ep in entry_points
)
diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py
index acc2d60b..bd20948f 100644
--- a/kombu/utils/debug.py
+++ b/kombu/utils/debug.py
@@ -1,5 +1,7 @@
"""Debugging support."""
+from __future__ import annotations
+
import logging
from vine.utils import wraps
diff --git a/kombu/utils/div.py b/kombu/utils/div.py
index 45be7f94..439b6639 100644
--- a/kombu/utils/div.py
+++ b/kombu/utils/div.py
@@ -1,5 +1,7 @@
"""Div. Utilities."""
+from __future__ import annotations
+
import sys
from .encoding import default_encode
diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py
index 5f58f0fa..42bf2ce9 100644
--- a/kombu/utils/encoding.py
+++ b/kombu/utils/encoding.py
@@ -5,6 +5,8 @@ applications without crashing from the infamous
:exc:`UnicodeDecodeError` exception.
"""
+from __future__ import annotations
+
import sys
import traceback
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index 48260a48..f8d89d45 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -1,5 +1,7 @@
"""Selector Utilities."""
+from __future__ import annotations
+
import errno
import math
import select as __select__
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 366a0b99..6beb17d7 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -1,5 +1,7 @@
"""Functional Utilities."""
+from __future__ import annotations
+
import inspect
import random
import threading
diff --git a/kombu/utils/imports.py b/kombu/utils/imports.py
index fd4482a8..8752fa1a 100644
--- a/kombu/utils/imports.py
+++ b/kombu/utils/imports.py
@@ -1,5 +1,7 @@
"""Import related utilities."""
+from __future__ import annotations
+
import importlib
import sys
diff --git a/kombu/utils/json.py b/kombu/utils/json.py
index cedaa793..ec6269e2 100644
--- a/kombu/utils/json.py
+++ b/kombu/utils/json.py
@@ -1,75 +1,75 @@
"""JSON Serialization Utilities."""
-import datetime
-import decimal
-import json as stdjson
+from __future__ import annotations
+
+import base64
+import json
import uuid
+from datetime import date, datetime, time
+from decimal import Decimal
+from typing import Any, Callable, TypeVar
-try:
- from django.utils.functional import Promise as DjangoPromise
-except ImportError: # pragma: no cover
- class DjangoPromise:
- """Dummy object."""
+textual_types = ()
try:
- import json
- _json_extra_kwargs = {}
-
- class _DecodeError(Exception):
- pass
-except ImportError: # pragma: no cover
- import simplejson as json
- from simplejson.decoder import JSONDecodeError as _DecodeError
- _json_extra_kwargs = {
- 'use_decimal': False,
- 'namedtuple_as_object': False,
- }
-
+ from django.utils.functional import Promise
-_encoder_cls = type(json._default_encoder)
-_default_encoder = None # ... set to JSONEncoder below.
+ textual_types += (Promise,)
+except ImportError:
+ pass
-class JSONEncoder(_encoder_cls):
+class JSONEncoder(json.JSONEncoder):
"""Kombu custom json encoder."""
- def default(self, o,
- dates=(datetime.datetime, datetime.date),
- times=(datetime.time,),
- textual=(decimal.Decimal, uuid.UUID, DjangoPromise),
- isinstance=isinstance,
- datetime=datetime.datetime,
- text_t=str):
- reducer = getattr(o, '__json__', None)
+ def default(self, o):
+ reducer = getattr(o, "__json__", None)
if reducer is not None:
return reducer()
- else:
- if isinstance(o, dates):
- if not isinstance(o, datetime):
- o = datetime(o.year, o.month, o.day, 0, 0, 0, 0)
- r = o.isoformat()
- if r.endswith("+00:00"):
- r = r[:-6] + "Z"
- return r
- elif isinstance(o, times):
- return o.isoformat()
- elif isinstance(o, textual):
- return text_t(o)
- return super().default(o)
+ if isinstance(o, textual_types):
+ return str(o)
+
+ for t, (marker, encoder) in _encoders.items():
+ if isinstance(o, t):
+ return _as(marker, encoder(o))
+
+ # Bytes is slightly trickier, so we cannot put them directly
+ # into _encoders, because we use two formats: bytes, and base64.
+ if isinstance(o, bytes):
+ try:
+ return _as("bytes", o.decode("utf-8"))
+ except UnicodeDecodeError:
+ return _as("base64", base64.b64encode(o).decode("utf-8"))
-_default_encoder = JSONEncoder
+ return super().default(o)
-def dumps(s, _dumps=json.dumps, cls=None, default_kwargs=None, **kwargs):
+def _as(t: str, v: Any):
+ return {"__type__": t, "__value__": v}
+
+
+def dumps(
+ s, _dumps=json.dumps, cls=JSONEncoder, default_kwargs=None, **kwargs
+):
"""Serialize object to json string."""
- if not default_kwargs:
- default_kwargs = _json_extra_kwargs
- return _dumps(s, cls=cls or _default_encoder,
- **dict(default_kwargs, **kwargs))
+ default_kwargs = default_kwargs or {}
+ return _dumps(s, cls=cls, **dict(default_kwargs, **kwargs))
+
+
+def object_hook(o: dict):
+ """Hook function to perform custom deserialization."""
+ if o.keys() == {"__type__", "__value__"}:
+ decoder = _decoders.get(o["__type__"])
+ if decoder:
+ return decoder(o["__value__"])
+ else:
+ raise ValueError("Unsupported type", type, o)
+ else:
+ return o
-def loads(s, _loads=json.loads, decode_bytes=True):
+def loads(s, _loads=json.loads, decode_bytes=True, object_hook=object_hook):
"""Deserialize json from string."""
# None of the json implementations supports decoding from
# a buffer/memoryview, or even reading from a stream
@@ -78,14 +78,51 @@ def loads(s, _loads=json.loads, decode_bytes=True):
# over. Note that pickle does support buffer/memoryview
# </rant>
if isinstance(s, memoryview):
- s = s.tobytes().decode('utf-8')
+ s = s.tobytes().decode("utf-8")
elif isinstance(s, bytearray):
- s = s.decode('utf-8')
+ s = s.decode("utf-8")
elif decode_bytes and isinstance(s, bytes):
- s = s.decode('utf-8')
-
- try:
- return _loads(s)
- except _DecodeError:
- # catch "Unpaired high surrogate" error
- return stdjson.loads(s)
+ s = s.decode("utf-8")
+
+ return _loads(s, object_hook=object_hook)
+
+
+DecoderT = EncoderT = Callable[[Any], Any]
+T = TypeVar("T")
+EncodedT = TypeVar("EncodedT")
+
+
+def register_type(
+ t: type[T],
+ marker: str,
+ encoder: Callable[[T], EncodedT],
+ decoder: Callable[[EncodedT], T],
+):
+ """Add support for serializing/deserializing native python type."""
+ _encoders[t] = (marker, encoder)
+ _decoders[marker] = decoder
+
+
+_encoders: dict[type, tuple[str, EncoderT]] = {}
+_decoders: dict[str, DecoderT] = {
+ "bytes": lambda o: o.encode("utf-8"),
+ "base64": lambda o: base64.b64decode(o.encode("utf-8")),
+}
+
+# NOTE: datetime should be registered before date,
+# because datetime is also instance of date.
+register_type(datetime, "datetime", datetime.isoformat, datetime.fromisoformat)
+register_type(
+ date,
+ "date",
+ lambda o: o.isoformat(),
+ lambda o: datetime.fromisoformat(o).date(),
+)
+register_type(time, "time", lambda o: o.isoformat(), time.fromisoformat)
+register_type(Decimal, "decimal", str, Decimal)
+register_type(
+ uuid.UUID,
+ "uuid",
+ lambda o: {"hex": o.hex, "version": o.version},
+ lambda o: uuid.UUID(**o),
+)
diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py
index d82884f5..36d11f1f 100644
--- a/kombu/utils/limits.py
+++ b/kombu/utils/limits.py
@@ -1,5 +1,7 @@
"""Token bucket implementation for rate limiting."""
+from __future__ import annotations
+
from collections import deque
from time import monotonic
diff --git a/kombu/utils/objects.py b/kombu/utils/objects.py
index 7fef4a2f..eb4dfc2a 100644
--- a/kombu/utils/objects.py
+++ b/kombu/utils/objects.py
@@ -1,5 +1,7 @@
"""Object Utilities."""
+from __future__ import annotations
+
__all__ = ('cached_property',)
try:
diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py
index 1875fce4..94286be8 100644
--- a/kombu/utils/scheduling.py
+++ b/kombu/utils/scheduling.py
@@ -1,5 +1,7 @@
"""Scheduling Utilities."""
+from __future__ import annotations
+
from itertools import count
from .imports import symbol_by_name
diff --git a/kombu/utils/text.py b/kombu/utils/text.py
index 1d5fb9de..fea53347 100644
--- a/kombu/utils/text.py
+++ b/kombu/utils/text.py
@@ -2,7 +2,10 @@
# flake8: noqa
+from __future__ import annotations
+
from difflib import SequenceMatcher
+from typing import Iterable, Iterator
from kombu import version_info_t
@@ -16,8 +19,7 @@ def escape_regex(p, white=''):
for c in p)
-def fmatch_iter(needle, haystack, min_ratio=0.6):
- # type: (str, Sequence[str], float) -> Iterator[Tuple[float, str]]
+def fmatch_iter(needle: str, haystack: Iterable[str], min_ratio: float = 0.6) -> Iterator[tuple[float, str]]:
"""Fuzzy match: iteratively.
Yields:
@@ -29,19 +31,17 @@ def fmatch_iter(needle, haystack, min_ratio=0.6):
yield ratio, key
-def fmatch_best(needle, haystack, min_ratio=0.6):
- # type: (str, Sequence[str], float) -> str
+def fmatch_best(needle: str, haystack: Iterable[str], min_ratio: float = 0.6) -> str | None:
"""Fuzzy match - Find best match (scalar)."""
try:
return sorted(
fmatch_iter(needle, haystack, min_ratio), reverse=True,
)[0][1]
except IndexError:
- pass
+ return None
-def version_string_as_tuple(s):
- # type: (str) -> version_info_t
+def version_string_as_tuple(s: str) -> version_info_t:
"""Convert version string to version info tuple."""
v = _unpack_version(*s.split('.'))
# X.Y.3a1 -> (X, Y, 3, 'a1')
@@ -53,13 +53,17 @@ def version_string_as_tuple(s):
return v
-def _unpack_version(major, minor=0, micro=0, releaselevel='', serial=''):
- # type: (int, int, int, str, str) -> version_info_t
+def _unpack_version(
+ major: str,
+ minor: str | int = 0,
+ micro: str | int = 0,
+ releaselevel: str = '',
+ serial: str = ''
+) -> version_info_t:
return version_info_t(int(major), int(minor), micro, releaselevel, serial)
-def _splitmicro(micro, releaselevel='', serial=''):
- # type: (int, str, str) -> Tuple[int, str, str]
+def _splitmicro(micro: str, releaselevel: str = '', serial: str = '') -> tuple[int, str, str]:
for index, char in enumerate(micro):
if not char.isdigit():
break
diff --git a/kombu/utils/time.py b/kombu/utils/time.py
index 863f4017..8228d2be 100644
--- a/kombu/utils/time.py
+++ b/kombu/utils/time.py
@@ -1,11 +1,9 @@
"""Time Utilities."""
-# flake8: noqa
-
+from __future__ import annotations
__all__ = ('maybe_s_to_ms',)
-def maybe_s_to_ms(v):
- # type: (Optional[Union[int, float]]) -> int
+def maybe_s_to_ms(v: int | float | None) -> int | None:
"""Convert seconds to milliseconds, but return None for None."""
return int(float(v) * 1000.0) if v is not None else v
diff --git a/kombu/utils/url.py b/kombu/utils/url.py
index de3a9139..f5f47701 100644
--- a/kombu/utils/url.py
+++ b/kombu/utils/url.py
@@ -2,6 +2,8 @@
# flake8: noqa
+from __future__ import annotations
+
from collections.abc import Mapping
from functools import partial
from typing import NamedTuple
diff --git a/kombu/utils/uuid.py b/kombu/utils/uuid.py
index 010b3440..9f77dad9 100644
--- a/kombu/utils/uuid.py
+++ b/kombu/utils/uuid.py
@@ -1,9 +1,11 @@
"""UUID utilities."""
+from __future__ import annotations
-from uuid import uuid4
+from typing import Callable
+from uuid import UUID, uuid4
-def uuid(_uuid=uuid4):
+def uuid(_uuid: Callable[[], UUID] = uuid4) -> str:
"""Generate unique id in UUID4 format.
See Also: