summaryrefslogtreecommitdiff
path: root/kombu/utils/json.py
blob: ec6269e2be2ee7990228c0bef1eda4df96568644 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""JSON Serialization Utilities."""

from __future__ import annotations

import base64
import json
import uuid
from datetime import date, datetime, time
from decimal import Decimal
from typing import Any, Callable, TypeVar

textual_types = ()

try:
    from django.utils.functional import Promise

    textual_types += (Promise,)
except ImportError:
    pass


class JSONEncoder(json.JSONEncoder):
    """Kombu custom json encoder."""

    def default(self, o):
        reducer = getattr(o, "__json__", None)
        if reducer is not None:
            return reducer()

        if isinstance(o, textual_types):
            return str(o)

        for t, (marker, encoder) in _encoders.items():
            if isinstance(o, t):
                return _as(marker, encoder(o))

        # Bytes is slightly trickier, so we cannot put them directly
        # into _encoders, because we use two formats: bytes, and base64.
        if isinstance(o, bytes):
            try:
                return _as("bytes", o.decode("utf-8"))
            except UnicodeDecodeError:
                return _as("base64", base64.b64encode(o).decode("utf-8"))

        return super().default(o)


def _as(t: str, v: Any):
    return {"__type__": t, "__value__": v}


def dumps(
    s, _dumps=json.dumps, cls=JSONEncoder, default_kwargs=None, **kwargs
):
    """Serialize object to json string."""
    default_kwargs = default_kwargs or {}
    return _dumps(s, cls=cls, **dict(default_kwargs, **kwargs))


def object_hook(o: dict):
    """Hook function to perform custom deserialization."""
    if o.keys() == {"__type__", "__value__"}:
        decoder = _decoders.get(o["__type__"])
        if decoder:
            return decoder(o["__value__"])
        else:
            raise ValueError("Unsupported type", type, o)
    else:
        return o


def loads(s, _loads=json.loads, decode_bytes=True, object_hook=object_hook):
    """Deserialize json from string."""
    # None of the json implementations supports decoding from
    # a buffer/memoryview, or even reading from a stream
    #    (load is just loads(fp.read()))
    # but this is Python, we love copying strings, preferably many times
    # over.  Note that pickle does support buffer/memoryview
    # </rant>
    if isinstance(s, memoryview):
        s = s.tobytes().decode("utf-8")
    elif isinstance(s, bytearray):
        s = s.decode("utf-8")
    elif decode_bytes and isinstance(s, bytes):
        s = s.decode("utf-8")

    return _loads(s, object_hook=object_hook)


DecoderT = EncoderT = Callable[[Any], Any]
T = TypeVar("T")
EncodedT = TypeVar("EncodedT")


def register_type(
    t: type[T],
    marker: str,
    encoder: Callable[[T], EncodedT],
    decoder: Callable[[EncodedT], T],
):
    """Add support for serializing/deserializing native python type."""
    _encoders[t] = (marker, encoder)
    _decoders[marker] = decoder


_encoders: dict[type, tuple[str, EncoderT]] = {}
_decoders: dict[str, DecoderT] = {
    "bytes": lambda o: o.encode("utf-8"),
    "base64": lambda o: base64.b64decode(o.encode("utf-8")),
}

# NOTE: datetime should be registered before date,
# because datetime is also instance of date.
register_type(datetime, "datetime", datetime.isoformat, datetime.fromisoformat)
register_type(
    date,
    "date",
    lambda o: o.isoformat(),
    lambda o: datetime.fromisoformat(o).date(),
)
register_type(time, "time", lambda o: o.isoformat(), time.fromisoformat)
register_type(Decimal, "decimal", str, Decimal)
register_type(
    uuid.UUID,
    "uuid",
    lambda o: {"hex": o.hex, "version": o.version},
    lambda o: uuid.UUID(**o),
)