summaryrefslogtreecommitdiff
path: root/kombu/compression.py
blob: b52426724a2ac1e8becbf22e97309aa3fbd597b5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Compression utilities."""

from kombu.utils.encoding import ensure_bytes

import zlib

try:
    import lzma
except ImportError:  # pragma: no cover
    # TODO: Drop fallback to backports once we drop Python 2.7 support
    try:
        from backports import lzma
    except ImportError:  # pragma: no cover
        lzma = None

_aliases = {}
_encoders = {}
_decoders = {}

__all__ = ('register', 'encoders', 'get_encoder',
           'get_decoder', 'compress', 'decompress')


def register(encoder, decoder, content_type, aliases=None):
    """Register new compression method.

    Arguments:
        encoder (Callable): Function used to compress text.
        decoder (Callable): Function used to decompress previously
            compressed text.
        content_type (str): The mime type this compression method
            identifies as.
        aliases (Sequence[str]): A list of names to associate with
            this compression method.
    """
    _encoders[content_type] = encoder
    _decoders[content_type] = decoder
    if aliases:
        _aliases.update((alias, content_type) for alias in aliases)


def encoders():
    """Return a list of available compression methods."""
    return list(_encoders)


def get_encoder(t):
    """Get encoder by alias name."""
    t = _aliases.get(t, t)
    return _encoders[t], t


def get_decoder(t):
    """Get decoder by alias name."""
    return _decoders[_aliases.get(t, t)]


def compress(body, content_type):
    """Compress text.

    Arguments:
        body (AnyStr): The text to compress.
        content_type (str): mime-type of compression method to use.
    """
    encoder, content_type = get_encoder(content_type)
    return encoder(ensure_bytes(body)), content_type


def decompress(body, content_type):
    """Decompress compressed text.

    Arguments:
        body (AnyStr): Previously compressed text to uncompress.
        content_type (str): mime-type of compression method used.
    """
    return get_decoder(content_type)(body)


register(zlib.compress,
         zlib.decompress,
         'application/x-gzip', aliases=['gzip', 'zlib'])

try:
    import bz2
except ImportError:
    pass  # No bz2 support
else:
    register(bz2.compress,
             bz2.decompress,
             'application/x-bz2', aliases=['bzip2', 'bzip'])

try:
    import brotli
except ImportError:  # pragma: no cover
    pass
else:
    register(brotli.compress,
             brotli.decompress,
             'application/x-brotli', aliases=['brotli'])

# TODO: Drop condition once we drop Python 2.7 support
if lzma:  # pragma: no cover
    register(lzma.compress,
             lzma.decompress,
             'application/x-lzma', aliases=['lzma', 'xz'])

try:
    import zstandard as zstd
except ImportError:  # pragma: no cover
    pass
else:
    def zstd_compress(body):
        c = zstd.ZstdCompressor()
        return c.compress(body)

    def zstd_decompress(body):
        d = zstd.ZstdDecompressor()
        return d.decompress(body)

    register(zstd_compress,
             zstd_decompress,
             'application/zstd', aliases=['zstd', 'zstandard'])