summaryrefslogtreecommitdiff
path: root/src/tox/session/cmd/run/parallel.py
blob: affe239d9c74bf6717c10e1622a726639e45da75 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
Run tox environments in parallel.
"""
import inspect
import logging
import os
import sys
from argparse import ArgumentTypeError
from collections import OrderedDict, deque
from pathlib import Path
from threading import Event, Semaphore, Thread

import tox
from tox.config.cli.parser import ToxParser
from tox.plugin.impl import impl
from tox.session.common import env_list_flag
from tox.session.state import State
from tox.util.cpu import auto_detect_cpus
from tox.util.spinner import Spinner

from .common import env_run_create_flags

logger = logging.getLogger(__name__)

ENV_VAR_KEY = "TOX_PARALLEL_ENV"
OFF_VALUE = 0
DEFAULT_PARALLEL = OFF_VALUE
MAIN_FILE = Path(inspect.getsourcefile(tox)).parent / "__main__.py"


@impl
def tox_add_option(parser: ToxParser):
    our = parser.add_command("run-parallel", ["p"], "run environments in parallel", run_parallel)
    env_list_flag(our)
    env_run_create_flags(our)

    def parse_num_processes(str_value):
        if str_value == "all":
            return None
        if str_value == "auto":
            return auto_detect_cpus()
        else:
            value = int(str_value)
            if value < 0:
                raise ArgumentTypeError("value must be positive")
            return value

    our.add_argument(
        "-p",
        "--parallel",
        dest="parallel",
        help="run tox environments in parallel, the argument controls limit: all,"
        " auto - cpu count, some positive number, zero is turn off",
        action="store",
        type=parse_num_processes,
        default=DEFAULT_PARALLEL,
        metavar="VAL",
    )
    our.add_argument(
        "-o",
        "--parallel-live",
        action="store_true",
        dest="parallel_live",
        help="connect to stdout while running environments",
    )


def run_parallel(state: State):
    """here we'll just start parallel sub-processes"""
    live_out = state.options.parallel_live
    disable_spinner = bool(os.environ.get("TOX_PARALLEL_NO_SPINNER") == "1")
    args = [sys.executable, MAIN_FILE] + state.args
    try:
        position = args.index("--")
    except ValueError:
        position = len(args)

    max_parallel = state.options.parallel
    if max_parallel is None:
        max_parallel = len(state.tox_envs)
    semaphore = Semaphore(max_parallel)
    finished = Event()

    show_progress = not disable_spinner and not live_out and state.options.verbosity > 2

    with Spinner(enabled=show_progress) as spinner:

        def run_in_thread(tox_env, os_env, process_dict):
            output = None
            env_name = tox_env.envconfig.envname
            status = "skipped tests" if state.options.no_test else None
            try:
                os_env[str(ENV_VAR_KEY)] = str(env_name)
                args_sub = list(args)
                if hasattr(tox_env, "package"):
                    args_sub.insert(position, str(tox_env.perform_packaging))
                    args_sub.insert(position, "--installpkg")
                if tox_env.get_result_json_path():
                    result_json_index = args_sub.index("--result-json")
                    args_sub[result_json_index + 1] = f"{tox_env.get_result_json_path()}"
                with tox_env.new_action(f"parallel {tox_env.name}") as action:

                    def collect_process(process):
                        process_dict[tox_env] = (action, process)

                    print_out = not live_out and tox_env.envconfig.parallel_show_output
                    output = action.popen(
                        args=args_sub,
                        env=os_env,
                        redirect=not live_out,
                        capture_err=print_out,
                        callback=collect_process,
                        returnout=print_out,
                    )

            except Exception as err:
                status = f"parallel child exit {err!r}"
            finally:
                semaphore.release()
                finished.set()
                tox_env.status = status
                done.add(env_name)
                outcome = spinner.succeed
                if state.options.notest:
                    outcome = spinner.skip
                elif status is not None:
                    outcome = spinner.fail
                outcome(env_name)
                if print_out and output is not None:
                    logger.warning(output)

        threads = deque()
        processes = {}
        todo_keys = set(state.env_list)
        todo = OrderedDict((n, todo_keys & set(v.conf["depends"])) for n, v in state.tox_envs.items())
        done = set()
        try:
            while todo:
                for name, depends in list(todo.items()):
                    if depends - done:
                        # skip if has unfinished dependencies
                        continue
                    del todo[name]
                    venv = state.tox_envs[name]
                    semaphore.acquire()
                    spinner.add(name)
                    thread = Thread(target=run_in_thread, args=(venv, os.environ.copy(), processes))
                    thread.daemon = True
                    thread.start()
                    threads.append(thread)
                if todo:
                    # wait until someone finishes and retry queuing jobs
                    finished.wait()
                    finished.clear()
            while threads:
                threads = [thread for thread in threads if not thread.join(0.1) and thread.is_alive()]
        except KeyboardInterrupt:
            logger.error(f"[{os.getpid()}] KeyboardInterrupt parallel - stopping children")
            while True:
                # do not allow interrupting until children interrupt
                try:
                    # putting it inside a thread to guarantee it's not interrupted
                    stopper = Thread(target=_stop_child_processes, args=(processes, threads))
                    stopper.start()
                    stopper.join()
                except KeyboardInterrupt:
                    continue
                raise KeyboardInterrupt


def _stop_child_processes(processes, main_threads):
    """A three level stop mechanism for children - INT (250ms) -> TERM (100ms) -> KILL"""

    # first stop children
    def shutdown(tox_env, action, process):  # noqa
        action.handle_interrupt(process)

    threads = [Thread(target=shutdown, args=(n, a, p)) for n, (a, p) in processes.items()]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    # then its threads
    for thread in main_threads:
        thread.join()