summaryrefslogtreecommitdiff
path: root/kombu/utils/url.py
blob: f5f477019d0bdd3d6f7784542df827aa00263e56 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""URL Utilities."""
# flake8: noqa


from __future__ import annotations

from collections.abc import Mapping
from functools import partial
from typing import NamedTuple
from urllib.parse import parse_qsl, quote, unquote, urlparse

try:
    import ssl
    ssl_available = True
except ImportError:  # pragma: no cover
    ssl_available = False

from ..log import get_logger

safequote = partial(quote, safe='')
logger = get_logger(__name__)

class urlparts(NamedTuple):
    """Named tuple representing parts of the URL."""

    scheme: str
    hostname: str
    port: int
    username: str
    password: str
    path: str
    query: Mapping


def parse_url(url):
    # type: (str) -> Dict
    """Parse URL into mapping of components."""
    scheme, host, port, user, password, path, query = _parse_url(url)
    if query:
        keys = [key for key in query.keys() if key.startswith('ssl_')]
        for key in keys:
            if key == 'ssl_cert_reqs':
                query[key] = parse_ssl_cert_reqs(query[key])
                if query[key] is None:
                    logger.warning('Defaulting to insecure SSL behaviour.')

            if 'ssl' not in query:
                query['ssl'] = {}

            query['ssl'][key] = query[key]
            del query[key]

    return dict(transport=scheme, hostname=host,
                port=port, userid=user,
                password=password, virtual_host=path, **query)


def url_to_parts(url):
    # type: (str) -> urlparts
    """Parse URL into :class:`urlparts` tuple of components."""
    scheme = urlparse(url).scheme
    schemeless = url[len(scheme) + 3:]
    # parse with HTTP URL semantics
    parts = urlparse('http://' + schemeless)
    path = parts.path or ''
    path = path[1:] if path and path[0] == '/' else path
    return urlparts(
        scheme,
        unquote(parts.hostname or '') or None,
        parts.port,
        unquote(parts.username or '') or None,
        unquote(parts.password or '') or None,
        unquote(path or '') or None,
        dict(parse_qsl(parts.query)),
    )


_parse_url = url_to_parts


def as_url(scheme, host=None, port=None, user=None, password=None,
           path=None, query=None, sanitize=False, mask='**'):
    # type: (str, str, int, str, str, str, str, bool, str) -> str
    """Generate URL from component parts."""
    parts = [f'{scheme}://']
    if user or password:
        if user:
            parts.append(safequote(user))
        if password:
            if sanitize:
                parts.extend([':', mask] if mask else [':'])
            else:
                parts.extend([':', safequote(password)])
        parts.append('@')
    parts.append(safequote(host) if host else '')
    if port:
        parts.extend([':', port])
    parts.extend(['/', path])
    return ''.join(str(part) for part in parts if part)


def sanitize_url(url, mask='**'):
    # type: (str, str) -> str
    """Return copy of URL with password removed."""
    return as_url(*_parse_url(url), sanitize=True, mask=mask)


def maybe_sanitize_url(url, mask='**'):
    # type: (Any, str) -> Any
    """Sanitize url, or do nothing if url undefined."""
    if isinstance(url, str) and '://' in url:
        return sanitize_url(url, mask)
    return url


def parse_ssl_cert_reqs(query_value):
    # type: (str) -> Any
    """Given the query parameter for ssl_cert_reqs, return the SSL constant or None."""
    if ssl_available:
        query_value_to_constant = {
            'CERT_REQUIRED': ssl.CERT_REQUIRED,
            'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
            'CERT_NONE': ssl.CERT_NONE,
            'required': ssl.CERT_REQUIRED,
            'optional': ssl.CERT_OPTIONAL,
            'none': ssl.CERT_NONE,
        }
        return query_value_to_constant[query_value]
    else:
        return None