summaryrefslogtreecommitdiff
path: root/src/setuptools_scm/_run_cmd.py
blob: 5fa5f22e591700f5e5a7e09c075345fe598622a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from __future__ import annotations

import os
import shlex
import subprocess
import textwrap
import warnings
from typing import Callable
from typing import Mapping
from typing import overload
from typing import Sequence
from typing import TypeVar

from . import _log
from . import _types as _t

log = _log.log.getChild("run_cmd")

PARSE_RESULT = TypeVar("PARSE_RESULT")
T = TypeVar("T")


def no_git_env(env: Mapping[str, str]) -> dict[str, str]:
    # adapted from pre-commit
    # Too many bugs dealing with environment variables and GIT:
    # https://github.com/pre-commit/pre-commit/issues/300
    # In git 2.6.3 (maybe others), git exports GIT_WORK_TREE while running
    # pre-commit hooks
    # In git 1.9.1 (maybe others), git exports GIT_DIR and GIT_INDEX_FILE
    # while running pre-commit hooks in submodules.
    # GIT_DIR: Causes git clone to clone wrong thing
    # GIT_INDEX_FILE: Causes 'error invalid object ...' during commit
    for k, v in env.items():
        if k.startswith("GIT_"):
            log.debug("%s: %s", k, v)
    return {
        k: v
        for k, v in env.items()
        if not k.startswith("GIT_")
        or k in ("GIT_EXEC_PATH", "GIT_SSH", "GIT_SSH_COMMAND")
    }


def avoid_pip_isolation(env: Mapping[str, str]) -> dict[str, str]:
    """
    pip build isolation can break Mercurial
    (see https://github.com/pypa/pip/issues/10635)

    pip uses PYTHONNOUSERSITE and a path in PYTHONPATH containing "pip-build-env-".
    """
    new_env = {k: v for k, v in env.items() if k != "PYTHONNOUSERSITE"}
    if "PYTHONPATH" not in new_env:
        return new_env

    new_env["PYTHONPATH"] = os.pathsep.join(
        [
            path
            for path in new_env["PYTHONPATH"].split(os.pathsep)
            if "pip-build-env-" not in path
        ]
    )
    return new_env


def ensure_stripped_str(str_or_bytes: str | bytes) -> str:
    if isinstance(str_or_bytes, str):
        return str_or_bytes.strip()
    else:
        return str_or_bytes.decode("utf-8", "surrogateescape").strip()


def run(
    cmd: _t.CMD_TYPE,
    cwd: _t.PathT,
    *,
    strip: bool = True,
    trace: bool = True,
    timeout: int = 20,
    check: bool = False,
) -> subprocess.CompletedProcess[str]:
    if isinstance(cmd, str):
        cmd = shlex.split(cmd)
    else:
        cmd = [os.fspath(x) for x in cmd]
    cmd_4_trace = " ".join(map(_unsafe_quote_for_display, cmd))
    log.debug("at %s\n    $ %s ", cwd, cmd_4_trace)
    res = subprocess.run(
        cmd,
        capture_output=True,
        cwd=os.fspath(cwd),
        env=dict(
            avoid_pip_isolation(no_git_env(os.environ)),
            # os.environ,
            # try to disable i18n
            LC_ALL="C",
            LANGUAGE="",
            HGPLAIN="1",
        ),
        text=True,
        timeout=timeout,
    )
    if strip:
        if res.stdout:
            res.stdout = ensure_stripped_str(res.stdout)
            res.stderr = ensure_stripped_str(res.stderr)
    if trace:
        if res.stdout:
            log.debug("out:\n%s", textwrap.indent(res.stdout, "    "))
        if res.stderr:
            log.debug("err:\n%s", textwrap.indent(res.stderr, "    "))
        if res.returncode:
            log.debug("ret: %s", res.returncode)
    if check:
        res.check_returncode()
    return res


@overload
def parse_success(
    res: subprocess.CompletedProcess[str],
    *,
    parse: Callable[[str], PARSE_RESULT],
    default: None = None,
    error_msg: str | None = None,
) -> PARSE_RESULT | None:
    ...


@overload
def parse_success(
    res: subprocess.CompletedProcess[str],
    *,
    parse: Callable[[str], PARSE_RESULT],
    default: T,
    error_msg: str | None = None,
) -> PARSE_RESULT | T:
    ...


def parse_success(
    res: subprocess.CompletedProcess[str],
    *,
    parse: Callable[[str], PARSE_RESULT],
    default: T | None = None,
    error_msg: str | None = None,
) -> PARSE_RESULT | T | None:
    if res.returncode:
        if error_msg:
            log.warning("%s %s", error_msg, res)
        return default
    else:
        return parse(res.stdout)


def _unsafe_quote_for_display(item: _t.PathT) -> str:
    # give better results than shlex.join in our cases
    text = os.fspath(item)
    return text if all(c not in text for c in " {[:") else f'"{text}"'


def has_command(name: str, args: Sequence[str] = ["help"], warn: bool = True) -> bool:
    try:
        p = run([name, *args], cwd=".", timeout=5)
    except OSError as e:
        log.warning("command %s missing: %s", name, e)
        res = False
    except subprocess.TimeoutExpired as e:
        log.warning("command %s timed out %s", name, e)
        res = False

    else:
        res = not p.returncode
    if not res and warn:
        warnings.warn("%r was not found" % name, category=RuntimeWarning)
    return res


def require_command(name: str) -> None:
    if not has_command(name, warn=False):
        raise OSError(f"{name!r} was not found")