1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
"""
Run tox environments in parallel.
"""
import inspect
import logging
import os
import sys
from argparse import ArgumentTypeError
from collections import OrderedDict, deque
from pathlib import Path
from threading import Event, Semaphore, Thread
import tox
from tox.config.cli.parser import ToxParser
from tox.plugin.impl import impl
from tox.session.common import env_list_flag
from tox.session.state import State
from tox.util.cpu import auto_detect_cpus
from tox.util.spinner import Spinner
from .common import env_run_create_flags
logger = logging.getLogger(__name__)
ENV_VAR_KEY = "TOX_PARALLEL_ENV"
OFF_VALUE = 0
DEFAULT_PARALLEL = OFF_VALUE
MAIN_FILE = Path(inspect.getsourcefile(tox)).parent / "__main__.py"
@impl
def tox_add_option(parser: ToxParser):
our = parser.add_command("run-parallel", ["p"], "run environments in parallel", run_parallel)
env_list_flag(our)
env_run_create_flags(our)
def parse_num_processes(str_value):
if str_value == "all":
return None
if str_value == "auto":
return auto_detect_cpus()
else:
value = int(str_value)
if value < 0:
raise ArgumentTypeError("value must be positive")
return value
our.add_argument(
"-p",
"--parallel",
dest="parallel",
help="run tox environments in parallel, the argument controls limit: all,"
" auto - cpu count, some positive number, zero is turn off",
action="store",
type=parse_num_processes,
default=DEFAULT_PARALLEL,
metavar="VAL",
)
our.add_argument(
"-o",
"--parallel-live",
action="store_true",
dest="parallel_live",
help="connect to stdout while running environments",
)
def run_parallel(state: State):
"""here we'll just start parallel sub-processes"""
live_out = state.options.parallel_live
disable_spinner = bool(os.environ.get("TOX_PARALLEL_NO_SPINNER") == "1")
args = [sys.executable, MAIN_FILE] + state.args
try:
position = args.index("--")
except ValueError:
position = len(args)
max_parallel = state.options.parallel
if max_parallel is None:
max_parallel = len(state.tox_envs)
semaphore = Semaphore(max_parallel)
finished = Event()
show_progress = not disable_spinner and not live_out and state.options.verbosity > 2
with Spinner(enabled=show_progress) as spinner:
def run_in_thread(tox_env, os_env, process_dict):
output = None
env_name = tox_env.envconfig.envname
status = "skipped tests" if state.options.no_test else None
try:
os_env[str(ENV_VAR_KEY)] = str(env_name)
args_sub = list(args)
if hasattr(tox_env, "package"):
args_sub.insert(position, str(tox_env.perform_packaging))
args_sub.insert(position, "--installpkg")
if tox_env.get_result_json_path():
result_json_index = args_sub.index("--result-json")
args_sub[result_json_index + 1] = f"{tox_env.get_result_json_path()}"
with tox_env.new_action(f"parallel {tox_env.name}") as action:
def collect_process(process):
process_dict[tox_env] = (action, process)
print_out = not live_out and tox_env.envconfig.parallel_show_output
output = action.popen(
args=args_sub,
env=os_env,
redirect=not live_out,
capture_err=print_out,
callback=collect_process,
returnout=print_out,
)
except Exception as err:
status = f"parallel child exit {err!r}"
finally:
semaphore.release()
finished.set()
tox_env.status = status
done.add(env_name)
outcome = spinner.succeed
if state.options.notest:
outcome = spinner.skip
elif status is not None:
outcome = spinner.fail
outcome(env_name)
if print_out and output is not None:
logger.warning(output)
threads = deque()
processes = {}
todo_keys = set(state.env_list)
todo = OrderedDict((n, todo_keys & set(v.conf["depends"])) for n, v in state.tox_envs.items())
done = set()
try:
while todo:
for name, depends in list(todo.items()):
if depends - done:
# skip if has unfinished dependencies
continue
del todo[name]
venv = state.tox_envs[name]
semaphore.acquire()
spinner.add(name)
thread = Thread(target=run_in_thread, args=(venv, os.environ.copy(), processes))
thread.daemon = True
thread.start()
threads.append(thread)
if todo:
# wait until someone finishes and retry queuing jobs
finished.wait()
finished.clear()
while threads:
threads = [thread for thread in threads if not thread.join(0.1) and thread.is_alive()]
except KeyboardInterrupt:
logger.error(f"[{os.getpid()}] KeyboardInterrupt parallel - stopping children")
while True:
# do not allow interrupting until children interrupt
try:
# putting it inside a thread to guarantee it's not interrupted
stopper = Thread(target=_stop_child_processes, args=(processes, threads))
stopper.start()
stopper.join()
except KeyboardInterrupt:
continue
raise KeyboardInterrupt
def _stop_child_processes(processes, main_threads):
"""A three level stop mechanism for children - INT (250ms) -> TERM (100ms) -> KILL"""
# first stop children
def shutdown(tox_env, action, process): # noqa
action.handle_interrupt(process)
threads = [Thread(target=shutdown, args=(n, a, p)) for n, (a, p) in processes.items()]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# then its threads
for thread in main_threads:
thread.join()
|