summaryrefslogtreecommitdiff
path: root/src/tox/report.py
blob: f8a57a789825f5cff012cacdbe8f2b6e29a4e3b6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""Handle reporting from within tox"""
from __future__ import annotations

import logging
import os
import sys
from contextlib import contextmanager
from io import BytesIO, TextIOWrapper
from threading import Thread, current_thread, enumerate, local
from typing import IO, Iterator, Tuple

from colorama import Fore, Style, deinit, init

LEVELS = {
    0: logging.CRITICAL,
    1: logging.ERROR,
    2: logging.WARNING,
    3: logging.INFO,
    4: logging.DEBUG,
    5: logging.NOTSET,
}

MAX_LEVEL = max(LEVELS.keys())
LOGGER = logging.getLogger()
OutErr = Tuple[TextIOWrapper, TextIOWrapper]


class _LogThreadLocal(local):
    """A thread local variable that inherits values from its parent"""

    _ident_to_data: dict[int | None, str] = {}

    def __init__(self, out_err: OutErr) -> None:
        self.name = self._ident_to_data.get(getattr(current_thread(), "parent_ident", None), "ROOT")
        self.out_err = out_err

    @staticmethod
    @contextmanager
    def patch_thread() -> Iterator[None]:
        def new_start(self: Thread) -> None:  # need to patch this
            self.parent_ident = current_thread().ident  # type: ignore[attr-defined]
            old_start(self)

        old_start, Thread.start = Thread.start, new_start  # type: ignore[assignment]
        try:
            yield
        finally:
            Thread.start = old_start  # type: ignore[assignment]

    @property
    def name(self) -> str:
        return self._name

    @name.setter
    def name(self, value: str) -> None:
        self._name = value

        for ident in self._ident_to_data.keys() - {t.ident for t in enumerate()}:
            self._ident_to_data.pop(ident)
        self._ident_to_data[current_thread().ident] = value

    @contextmanager
    def with_name(self, name: str) -> Iterator[None]:
        previous, self.name = self.name, name
        try:
            yield
        finally:
            self.name = previous

    @contextmanager
    def suspend_out_err(self, yes: bool, out_err: OutErr | None = None) -> Iterator[OutErr]:
        previous_out, previous_err = self.out_err
        try:
            if yes:
                if out_err is None:  # pragma: no branch
                    out = self._make(f"out-{self.name}", previous_out)
                    err = self._make(f"err-{self.name}", previous_err)
                else:
                    out, err = out_err  # pragma: no cover
                self.out_err = out, err
            yield self.out_err
        finally:
            if yes:
                self.out_err = previous_out, previous_err

    @staticmethod
    def _make(prefix: str, based_of: TextIOWrapper) -> TextIOWrapper:
        return TextIOWrapper(NamedBytesIO(f"{prefix}-{based_of.name}"))


class NamedBytesIO(BytesIO):
    def __init__(self, name: str) -> None:
        super().__init__()
        self.name: str = name


class ToxHandler(logging.StreamHandler):  # type: ignore[type-arg] # is generic but at runtime doesn't take a type arg
    # """Controls tox output."""

    def __init__(self, level: int, is_colored: bool, out_err: OutErr) -> None:
        self._local = _LogThreadLocal(out_err)
        super().__init__(stream=self.stdout)
        if is_colored:
            deinit()
            init()
        self._is_colored = is_colored
        self._setup_level(is_colored, level)

    def _setup_level(self, is_colored: bool, level: int) -> None:
        self.setLevel(level)
        self._error_formatter = self._get_formatter(logging.ERROR, level, is_colored)
        self._warning_formatter = self._get_formatter(logging.WARNING, level, is_colored)
        self._remaining_formatter = self._get_formatter(logging.INFO, level, is_colored)

    @contextmanager
    def with_context(self, name: str) -> Iterator[None]:
        """
        Set a new tox environment context

        :param name: the name of the tox environment
        """
        with self._local.with_name(name):
            yield

    @property
    def name(self) -> str:  # type: ignore[override]
        """:return: the current tox environment name"""
        return self._local.name  # pragma: no cover

    @property
    def stdout(self) -> TextIOWrapper:
        """:return: the current standard output"""
        return self._local.out_err[0]

    @property
    def stderr(self) -> TextIOWrapper:
        """:return: the current standard error"""
        return self._local.out_err[1]

    @property  # type: ignore[override]
    def stream(self) -> IO[str]:
        """:return: the current stream to write to (alias for the current standard output)"""
        return self.stdout

    @stream.setter
    def stream(self, value: IO[str]) -> None:  # noqa: U100
        """ignore anyone changing this"""

    @contextmanager
    def suspend_out_err(self, yes: bool, out_err: OutErr | None = None) -> Iterator[OutErr]:
        with self._local.suspend_out_err(yes, out_err) as out_err_res:
            yield out_err_res

    def write_out_err(self, out_err: tuple[bytes, bytes]) -> None:
        # read/write through the buffer as we collect bytes to print bytes (no transcoding needed)
        self.stdout.buffer.write(out_err[0])
        self.stderr.buffer.write(out_err[1])

    @staticmethod
    def _get_formatter(level: int, enabled_level: int, is_colored: bool) -> logging.Formatter:
        color: int | str = ""
        if is_colored:
            if level >= logging.ERROR:
                color = Fore.RED
            elif level >= logging.WARNING:
                color = Fore.CYAN
            else:
                color = Fore.WHITE

        def _c(val: int) -> str:
            return str(val) if color else ""

        fmt = f"{color} %(message)s{_c(Style.RESET_ALL)}"
        if enabled_level <= logging.DEBUG:
            fmt = (
                f"{_c(Fore.GREEN)} %(relativeCreated)d %(levelname).1s{_c(Style.RESET_ALL)}{fmt}{_c(Style.DIM)}"
                f" [%(pathname)s:%(lineno)d]{_c(Style.RESET_ALL)}"
            )
        fmt = f"{_c(Style.BRIGHT)}{_c(Fore.MAGENTA)}%(env_name)s:{_c(Style.RESET_ALL)}" + fmt
        formatter = logging.Formatter(fmt)
        return formatter

    def format(self, record: logging.LogRecord) -> str:
        # shorten the pathname to start from within the site-packages folder
        record.env_name = "root" if self._local.name is None else self._local.name
        basename = os.path.dirname(record.pathname)
        len_sys_path_match = max((len(p) for p in sys.path if basename.startswith(p)), default=-1)
        record.pathname = record.pathname[len_sys_path_match + 1 :]

        if record.levelno >= logging.ERROR:
            return self._error_formatter.format(record)
        if record.levelno >= logging.WARNING:
            if self._is_colored and record.msg == "%s%s> %s" and record.args:
                record.msg = f"%s{Style.NORMAL}%s{Style.DIM}>{Style.RESET_ALL} %s"
            return self._warning_formatter.format(record)
        return self._remaining_formatter.format(record)

    @staticmethod
    @contextmanager
    def patch_thread() -> Iterator[None]:
        with _LogThreadLocal.patch_thread():
            yield

    def update_verbosity(self, verbosity: int) -> None:
        level = _get_level(verbosity)
        LOGGER.setLevel(level)
        self._setup_level(self._is_colored, level)


def setup_report(verbosity: int, is_colored: bool) -> ToxHandler:
    _clean_handlers(LOGGER)
    level = _get_level(verbosity)
    LOGGER.setLevel(level)
    for name in ("distlib.util", "filelock"):
        logger = logging.getLogger(name)
        logger.disabled = True
    out_err: OutErr = (sys.stdout, sys.stderr)  # type: ignore[assignment]
    handler = ToxHandler(level, is_colored, out_err)
    LOGGER.addHandler(handler)

    logging.debug("setup logging to %s on pid %s", logging.getLevelName(level), os.getpid())
    return handler


def _get_level(verbosity: int) -> int:
    if verbosity > MAX_LEVEL:
        verbosity = MAX_LEVEL
    level = LEVELS[verbosity]
    return level


def _clean_handlers(log: logging.Logger) -> None:
    for log_handler in list(log.handlers):  # remove handlers of libraries
        log.removeHandler(log_handler)


class HandledError(RuntimeError):
    """Error that has been handled so no need for stack trace"""