summaryrefslogtreecommitdiff
path: root/src/tox/action.py
blob: e7f9b77bb2cf4b522fe8eaf1d6e80071ea57ce54 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
from __future__ import absolute_import, unicode_literals

import os
import pipes
import signal
import subprocess
import sys
import time
from contextlib import contextmanager
from threading import Thread

import py

from tox import reporter
from tox.constants import INFO
from tox.exception import InvocationError
from tox.reporter import Verbosity
from tox.util.lock import get_unique_file
from tox.util.stdlib import is_main_thread


class Action(object):
    """Action is an effort to group operations with the same goal (within reporting)"""

    def __init__(
        self,
        name,
        msg,
        args,
        log_dir,
        generate_tox_log,
        command_log,
        popen,
        python,
        suicide_timeout,
        interrupt_timeout,
        terminate_timeout,
    ):
        self.name = name
        self.args = args
        self.msg = msg
        self.activity = self.msg.split(" ", 1)[0]
        self.log_dir = log_dir
        self.generate_tox_log = generate_tox_log
        self.via_popen = popen
        self.command_log = command_log
        self._timed_report = None
        self.python = python
        self.suicide_timeout = suicide_timeout
        self.interrupt_timeout = interrupt_timeout
        self.terminate_timeout = terminate_timeout
        if is_main_thread():
            # python allows only main thread to install signal handlers
            # see https://docs.python.org/3/library/signal.html#signals-and-threads
            self._install_sigterm_handler()

    def __enter__(self):
        msg = "{} {}".format(self.msg, " ".join(map(str, self.args)))
        self._timed_report = reporter.timed_operation(self.name, msg)
        self._timed_report.__enter__()

        return self

    def __exit__(self, type, value, traceback):
        self._timed_report.__exit__(type, value, traceback)

    def setactivity(self, name, msg):
        self.activity = name
        if msg:
            reporter.verbosity0("{} {}: {}".format(self.name, name, msg), bold=True)
        else:
            reporter.verbosity1("{} {}: {}".format(self.name, name, msg), bold=True)

    def info(self, name, msg):
        reporter.verbosity1("{} {}: {}".format(self.name, name, msg), bold=True)

    def popen(
        self,
        args,
        cwd=None,
        env=None,
        redirect=True,
        returnout=False,
        ignore_ret=False,
        capture_err=True,
        callback=None,
        report_fail=True,
    ):
        """this drives an interaction with a subprocess"""
        cwd = py.path.local() if cwd is None else cwd
        cmd_args = [str(x) for x in self._rewrite_args(cwd, args)]
        cmd_args_shell = " ".join(pipes.quote(i) for i in cmd_args)
        stream_getter = self._get_standard_streams(
            capture_err,
            cmd_args_shell,
            redirect,
            returnout,
            cwd,
        )
        exit_code, output = None, None
        with stream_getter as (fin, out_path, stderr, stdout):
            try:
                process = self.via_popen(
                    cmd_args,
                    stdout=stdout,
                    stderr=stderr,
                    cwd=str(cwd),
                    env=os.environ.copy() if env is None else env,
                    universal_newlines=True,
                    shell=False,
                    creationflags=(
                        subprocess.CREATE_NEW_PROCESS_GROUP
                        if sys.platform == "win32"
                        else 0
                        # needed for Windows signal send ability (CTRL+C)
                    ),
                )
            except OSError as exception:
                exit_code = exception.errno
            else:
                if callback is not None:
                    callback(process)
                reporter.log_popen(cwd, out_path, cmd_args_shell, process.pid)
                output = self.evaluate_cmd(fin, process, redirect)
                exit_code = process.returncode
            finally:
                if out_path is not None and out_path.exists():
                    lines = out_path.read_text("UTF-8").split("\n")
                    # first three lines are the action, cwd, and cmd - remove it
                    output = "\n".join(lines[3:])
                try:
                    if exit_code and not ignore_ret:
                        if report_fail:
                            msg = "invocation failed (exit code {:d})".format(exit_code)
                            if out_path is not None:
                                msg += ", logfile: {}".format(out_path)
                                if not out_path.exists():
                                    msg += " warning log file missing"
                            reporter.error(msg)
                            if out_path is not None and out_path.exists():
                                reporter.separator("=", "log start", Verbosity.QUIET)
                                reporter.quiet(output)
                                reporter.separator("=", "log end", Verbosity.QUIET)
                        raise InvocationError(cmd_args_shell, exit_code, output)
                finally:
                    self.command_log.add_command(cmd_args, output, exit_code)
        return output

    def evaluate_cmd(self, input_file_handler, process, redirect):
        try:
            if self.generate_tox_log and not redirect:
                if process.stderr is not None:
                    # prevent deadlock
                    raise ValueError("stderr must not be piped here")
                # we read binary from the process and must write using a binary stream
                buf = getattr(sys.stdout, "buffer", sys.stdout)
                last_time = time.time()
                while True:
                    # we have to read one byte at a time, otherwise there
                    # might be no output for a long time with slow tests
                    data = input_file_handler.read(1)
                    if data:
                        buf.write(data)
                        if b"\n" in data or (time.time() - last_time) > 1:
                            # we flush on newlines or after 1 second to
                            # provide quick enough feedback to the user
                            # when printing a dot per test
                            buf.flush()
                            last_time = time.time()
                    elif process.poll() is not None:
                        if process.stdout is not None:
                            process.stdout.close()
                        break
                    else:
                        time.sleep(0.1)
                        # the seek updates internal read buffers
                        input_file_handler.seek(0, 1)
                input_file_handler.close()
            out, _ = process.communicate()  # wait to finish
        except KeyboardInterrupt as exception:
            reporter.error("got KeyboardInterrupt signal")
            main_thread = is_main_thread()
            while True:
                try:
                    if main_thread:
                        # spin up a new thread to disable further interrupt on main thread
                        stopper = Thread(target=self.handle_interrupt, args=(process,))
                        stopper.start()
                        stopper.join()
                    else:
                        self.handle_interrupt(process)
                except KeyboardInterrupt:
                    continue
                break
            raise exception
        return out

    def handle_interrupt(self, process):
        """A three level stop mechanism for children - INT -> TERM -> KILL"""
        msg = "from {} {{}} pid {}".format(os.getpid(), process.pid)
        if self._wait(process, self.suicide_timeout) is None:
            self.info("KeyboardInterrupt", msg.format("SIGINT"))
            process.send_signal(signal.CTRL_C_EVENT if sys.platform == "win32" else signal.SIGINT)
            if self._wait(process, self.interrupt_timeout) is None:
                self.info("KeyboardInterrupt", msg.format("SIGTERM"))
                process.terminate()
                if self._wait(process, self.terminate_timeout) is None:
                    self.info("KeyboardInterrupt", msg.format("SIGKILL"))
                    process.kill()
                    process.communicate()

    @staticmethod
    def _wait(process, timeout):
        if sys.version_info >= (3, 3):
            # python 3 has timeout feature built-in
            try:
                process.communicate(timeout=timeout)
            except subprocess.TimeoutExpired:
                pass
        else:
            # on Python 2 we need to simulate it
            delay = 0.01
            while process.poll() is None and timeout > 0:
                time.sleep(delay)
                timeout -= delay
        return process.poll()

    @contextmanager
    def _get_standard_streams(self, capture_err, cmd_args_shell, redirect, returnout, cwd):
        stdout = out_path = input_file_handler = None
        stderr = subprocess.STDOUT if capture_err else None

        if self.generate_tox_log or redirect:
            out_path = self.get_log_path(self.name)
            with out_path.open("wt") as stdout, out_path.open("rb") as input_file_handler:
                msg = "action: {}, msg: {}\ncwd: {}\ncmd: {}\n".format(
                    self.name.replace("\n", " "),
                    self.msg.replace("\n", " "),
                    str(cwd).replace("\n", " "),
                    cmd_args_shell.replace("\n", " "),
                )
                stdout.write(msg)
                stdout.flush()
                input_file_handler.read()  # read the header, so it won't be written to stdout
                yield input_file_handler, out_path, stderr, stdout
                return

        if returnout:
            stdout = subprocess.PIPE

        yield input_file_handler, out_path, stderr, stdout

    def get_log_path(self, actionid):
        log_file = get_unique_file(self.log_dir, prefix=actionid, suffix=".log")
        return log_file

    def _rewrite_args(self, cwd, args):

        executable = None
        if INFO.IS_WIN:
            # shebang lines are not adhered on Windows so if it's a python script
            # pre-pend the interpreter
            ext = os.path.splitext(str(args[0]))[1].lower()
            if ext == ".py":
                executable = str(self.python)
        if executable is None:
            executable = args[0]
            args = args[1:]

        new_args = [executable]

        # to make the command shorter try to use relative paths for all subsequent arguments
        # note the executable cannot be relative as the Windows applies cwd after invocation
        for arg in args:
            if arg and os.path.isabs(str(arg)):
                arg_path = py.path.local(arg)
                if arg_path.exists() and arg_path.common(cwd) is not None:
                    potential_arg = cwd.bestrelpath(arg_path)
                    if len(potential_arg.split("..")) < 2:
                        # just one parent directory accepted as relative path
                        arg = potential_arg
            new_args.append(str(arg))

        return new_args

    def _install_sigterm_handler(self):
        """Handle sigterm as if it were a keyboardinterrupt"""

        def sigterm_handler(signum, frame):
            reporter.error("Got SIGTERM, handling it as a KeyboardInterrupt")
            raise KeyboardInterrupt()

        signal.signal(signal.SIGTERM, sigterm_handler)