diff options
author | Bernát Gábor <bgabor8@bloomberg.net> | 2020-12-27 13:57:00 +0000 |
---|---|---|
committer | Bernát Gábor <bgabor8@bloomberg.net> | 2020-12-27 13:57:00 +0000 |
commit | 22972ff24b208839a30a187a2cd6e3974c2e1da9 (patch) | |
tree | c34f1ad91c0a50fd414406fea788d8b760dd3114 | |
parent | 0bbdf445fc7d47d0dd31599872975534243ed555 (diff) | |
download | tox-git-22972ff24b208839a30a187a2cd6e3974c2e1da9.tar.gz |
Rework and simplify local subprocess executor handlers
Signed-off-by: Bernát Gábor <bgabor8@bloomberg.net>
-rw-r--r-- | src/tox/execute/api.py | 51 | ||||
-rw-r--r-- | src/tox/execute/local_sub_process/__init__.py | 23 | ||||
-rw-r--r-- | src/tox/execute/local_sub_process/read_via_thread.py | 46 | ||||
-rw-r--r-- | src/tox/execute/local_sub_process/read_via_thread_unix.py | 45 | ||||
-rw-r--r-- | src/tox/execute/local_sub_process/read_via_thread_windows.py | 75 | ||||
-rw-r--r-- | src/tox/execute/request.py | 3 | ||||
-rw-r--r-- | src/tox/util/signal.py | 45 | ||||
-rw-r--r-- | tests/execute/local_subprocess/bad_process.py | 41 | ||||
-rw-r--r-- | tests/execute/local_subprocess/local_subprocess_sigint.py | 26 | ||||
-rw-r--r-- | tests/execute/local_subprocess/test_local_subprocess.py | 45 |
10 files changed, 199 insertions, 201 deletions
diff --git a/src/tox/execute/api.py b/src/tox/execute/api.py index 0d500266..fdfffd7f 100644 --- a/src/tox/execute/api.py +++ b/src/tox/execute/api.py @@ -2,9 +2,7 @@ Abstract base API for executing commands within tox environments. """ import logging -import signal import sys -import threading import time from abc import ABC, abstractmethod from contextlib import contextmanager @@ -14,17 +12,13 @@ from typing import Callable, Iterator, NoReturn, Optional, Sequence, Tuple, Type from colorama import Fore from tox.report import OutErr +from tox.util.signal import DelayedSignal from .request import ExecuteRequest, StdinSource from .stream import SyncWrite ContentHandler = Callable[[bytes], None] Executor = Callable[[ExecuteRequest, ContentHandler, ContentHandler], int] -if sys.platform == "win32": # pragma: win32 cover - SIGINT = signal.CTRL_C_EVENT -else: - SIGINT = signal.SIGINT - LOGGER = logging.getLogger(__name__) @@ -73,38 +67,22 @@ class Execute(ABC): @contextmanager def call(self, request: ExecuteRequest, show: bool, out_err: OutErr) -> Iterator[ExecuteStatus]: - start = time.monotonic() - interrupt = None + start, interrupt = time.monotonic(), None try: # collector is what forwards the content from the file streams to the standard streams - out = out_err[0].buffer - with SyncWrite(out.name, out if show else None) as out_sync: - err = out_err[1].buffer - with SyncWrite(err.name, err if show else None, Fore.RED if self._colored else None) as err_sync: - instance = self.build_instance(request, out_sync, err_sync) - try: - with instance as status: - yield status - exit_code = status.exit_code - except KeyboardInterrupt as exception: + out, err = out_err[0].buffer, out_err[1].buffer + out_sync = SyncWrite(out.name, out if show else None) + err_sync = SyncWrite(err.name, err if show else None, Fore.RED if self._colored else None) + with out_sync, err_sync: + instance = self.build_instance(request, out_sync, err_sync) + try: + with instance as status: + yield status + exit_code = status.exit_code + except KeyboardInterrupt as exception: + with DelayedSignal(): interrupt = exception - while True: - try: - is_main = threading.current_thread() == threading.main_thread() - if is_main: - # disable further interrupts until we finish this, main thread only - if sys.platform != "win32": # pragma: win32 cover - signal.signal(SIGINT, signal.SIG_IGN) - except KeyboardInterrupt: # pragma: no cover - continue # pragma: no cover - else: - try: - exit_code = instance.interrupt() - break - finally: - # restore signal handler on main thread - if is_main and sys.platform != "win32": # pragma: no cover - signal.signal(SIGINT, signal.default_int_handler) + exit_code = instance.interrupt() finally: end = time.monotonic() status.outcome = Outcome(request, show, exit_code, out_sync.text, err_sync.text, start, end, instance.cmd) @@ -222,7 +200,6 @@ class ToxKeyboardInterrupt(KeyboardInterrupt): __all__ = ( "ContentHandler", - "SIGINT", "Outcome", "ToxKeyboardInterrupt", "Execute", diff --git a/src/tox/execute/local_sub_process/__init__.py b/src/tox/execute/local_sub_process/__init__.py index 89381535..aed511cd 100644 --- a/src/tox/execute/local_sub_process/__init__.py +++ b/src/tox/execute/local_sub_process/__init__.py @@ -7,10 +7,11 @@ from subprocess import PIPE, TimeoutExpired from types import TracebackType from typing import TYPE_CHECKING, Generator, List, Optional, Sequence, Tuple, Type -from tox.execute.stream import SyncWrite +from tox.util.signal import SIGINT -from ..api import SIGINT, Execute, ExecuteInstance, ExecuteStatus, Outcome +from ..api import Execute, ExecuteInstance, ExecuteStatus, Outcome from ..request import ExecuteRequest, StdinSource +from ..stream import SyncWrite from .read_via_thread import WAIT_GENERAL if sys.platform == "win32": # pragma: win32 cover @@ -80,6 +81,9 @@ class LocalSubprocessExecuteStatus(ExecuteStatus): if stdin is not None: stdin.close() + def __repr__(self) -> str: + return f"{self.__class__.__name__}(pid={self._process.pid}, returncode={self._process.returncode!r})" + class LocalSubprocessExecuteFailedStatus(ExecuteStatus): def __init__(self, out: SyncWrite, err: SyncWrite, exit_code: Optional[int]) -> None: @@ -143,22 +147,17 @@ class LocalSubProcessExecuteInstance(ExecuteInstance): return LocalSubprocessExecuteFailedStatus(self._out, self._err, exception.errno) status = LocalSubprocessExecuteStatus(self._out, self._err, process) - if self.request.stdin is StdinSource.OFF: - status.close_stdin() - pid = self.process.pid - self._read_stderr = ReadViaThread( - stderr.send(process), self.err_handler, name=f"err-{pid}", on_exit_drain=self._on_exit_drain - ) + drain, pid = self._on_exit_drain, self.process.pid + self._read_stderr = ReadViaThread(stderr.send(process), self.err_handler, name=f"err-{pid}", drain=drain) self._read_stderr.__enter__() - self._read_stdout = ReadViaThread( - stdout.send(process), self.out_handler, name=f"out-{pid}", on_exit_drain=self._on_exit_drain - ) + self._read_stdout = ReadViaThread(stdout.send(process), self.out_handler, name=f"out-{pid}", drain=drain) self._read_stdout.__enter__() if sys.platform == "win32": # pragma: win32 cover process.stderr.read = self._read_stderr._drain_stream # type: ignore[assignment,union-attr] process.stdout.read = self._read_stdout._drain_stream # type: ignore[assignment,union-attr] - # wait it out with interruptions to allow KeyboardInterrupt on Windows + if self.request.stdin is StdinSource.OFF: + status.close_stdin() return status def __exit__( diff --git a/src/tox/execute/local_sub_process/read_via_thread.py b/src/tox/execute/local_sub_process/read_via_thread.py index 3b609748..310e0b90 100644 --- a/src/tox/execute/local_sub_process/read_via_thread.py +++ b/src/tox/execute/local_sub_process/read_via_thread.py @@ -6,16 +6,18 @@ from threading import Event, Thread from types import TracebackType from typing import Callable, Optional, Type -WAIT_GENERAL = 0.1 +from tox.util.signal import DelayedSignal + +WAIT_GENERAL = 0.05 # stop thread join every so often (give chance to a signal interrupt) class ReadViaThread(ABC): - def __init__(self, file_no: int, handler: Callable[[bytes], None], name: str, on_exit_drain: bool) -> None: + def __init__(self, file_no: int, handler: Callable[[bytes], None], name: str, drain: bool) -> None: self.file_no = file_no self.stop = Event() self.thread = Thread(target=self._read_stream, name=f"tox-r-{name}-{file_no}") self.handler = handler - self._on_exit_drain = on_exit_drain + self._on_exit_drain = drain def __enter__(self) -> "ReadViaThread": self.thread.start() @@ -24,42 +26,16 @@ class ReadViaThread(ABC): def __exit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> None: - thrown = None - while True: - try: - self.stop.set() - while self.thread.is_alive(): - self.thread.join(WAIT_GENERAL) - except KeyboardInterrupt as exception: # pragma: no cover - thrown = exception # pragma: no cover - continue # pragma: no cover - else: - if thrown is not None: - raise thrown # pragma: no cover - else: # pragma: no cover - break # pragma: no cover - if exc_val is None: # drain what remains if we were not interrupted - try: - if self._on_exit_drain: - data = self._drain_stream() - else: - data = b"" - except ValueError: # pragma: no cover - pass # pragma: no cover - else: - while True: - try: - self.handler(data) - break - except KeyboardInterrupt as exception: # pragma: no cover - thrown = exception # pragma: no cover - if thrown is not None: - raise thrown # pragma: no cover + with DelayedSignal(): + self.stop.set() # signal thread to stop + while self.thread.is_alive(): # wait until it stops + self.thread.join(WAIT_GENERAL) + self._drain_stream() # read anything left @abstractmethod def _read_stream(self) -> None: raise NotImplementedError @abstractmethod - def _drain_stream(self) -> bytes: + def _drain_stream(self) -> None: raise NotImplementedError diff --git a/src/tox/execute/local_sub_process/read_via_thread_unix.py b/src/tox/execute/local_sub_process/read_via_thread_unix.py index ba156e7f..49867f73 100644 --- a/src/tox/execute/local_sub_process/read_via_thread_unix.py +++ b/src/tox/execute/local_sub_process/read_via_thread_unix.py @@ -12,36 +12,27 @@ STOP_EVENT_CHECK_PERIODICITY_IN_MS = 0.01 # pragma: win32 no cover class ReadViaThreadUnix(ReadViaThread): # pragma: win32 no cover - def __init__(self, file_no: int, handler: Callable[[bytes], None], name: str, on_exit_drain: bool) -> None: - super().__init__(file_no, handler, name, on_exit_drain) + def __init__(self, file_no: int, handler: Callable[[bytes], None], name: str, drain: bool) -> None: + super().__init__(file_no, handler, name, drain) def _read_stream(self) -> None: while not self.stop.is_set(): # we need to drain the stream, but periodically give chance for the thread to break if the stop event has # been set (this is so that an interrupt can be handled) - try: - ready, __, ___ = select.select([self.file_no], [], [], STOP_EVENT_CHECK_PERIODICITY_IN_MS) - if ready: - data = os.read(self.file_no, 1) - if data: - try: - self.handler(data) - except Exception: # noqa - pass - except OSError as exception: - if exception.errno == errno.EBADF: - break - raise - - def _drain_stream(self) -> bytes: - result = bytearray() # on closed file read returns empty - while True: - try: - last_result = os.read(self.file_no, 1) - except OSError: # pragma: no cover # ignore failing to read the pipe - already closed + if self._read_available() is False: break - if last_result: - result.append(last_result[0]) - else: - break # pragma: no cover # somehow python optimizes away this line, but break is hit to stop in tests - return bytes(result) + + def _drain_stream(self) -> None: + self._read_available(timeout=0.0001) + + def _read_available(self, timeout: float = STOP_EVENT_CHECK_PERIODICITY_IN_MS) -> bool: + try: + ready, __, ___ = select.select([self.file_no], [], [], timeout) + if ready: + data = os.read(self.file_no, ready[0]) + self.handler(data) + except OSError as exception: + if exception.errno == errno.EBADF: + return False + raise + return True diff --git a/src/tox/execute/local_sub_process/read_via_thread_windows.py b/src/tox/execute/local_sub_process/read_via_thread_windows.py index 173b9353..d4dab918 100644 --- a/src/tox/execute/local_sub_process/read_via_thread_windows.py +++ b/src/tox/execute/local_sub_process/read_via_thread_windows.py @@ -1,9 +1,10 @@ """ On Windows we use overlapped mechanism, borrowing it from asyncio (but without the event loop). """ +import logging from asyncio.windows_utils import BUFSIZE # pragma: win32 cover from time import sleep -from typing import Callable # pragma: win32 cover +from typing import Callable, Optional # pragma: win32 cover import _overlapped # type: ignore[import] # pragma: win32 cover @@ -11,47 +12,45 @@ from .read_via_thread import ReadViaThread # pragma: win32 cover class ReadViaThreadWindows(ReadViaThread): # pragma: win32 cover - def __init__(self, file_no: int, handler: Callable[[bytes], None], name: str, on_exit_drain: bool) -> None: - super().__init__(file_no, handler, name, on_exit_drain) + def __init__(self, file_no: int, handler: Callable[[bytes], None], name: str, drain: bool) -> None: + super().__init__(file_no, handler, name, drain) self.closed = False + self._ov = _overlapped.Overlapped(0) + self._read = False def _read_stream(self) -> None: - ov = None keep_reading = True - while keep_reading: - if ov is None: # if we have no overlapped handler create one - ov = _overlapped.Overlapped(0) - try: - # read up to BUFSIZE at a time - ov.ReadFile(self.file_no, BUFSIZE) # type: ignore[attr-defined] - except BrokenPipeError: - self.closed = True - return - # this loop break condition is here to ensure we always try to drain a constructed overlap handler, either - # after a period of sleep or after just constructing one + while keep_reading: # try to read at least once + wait = self._read_batch() + if wait is None: + break + if wait is True: + sleep(0.01) # sleep for 10ms if there was no data to read and try again keep_reading = not self.stop.is_set() - try: - data = ov.getresult(False) # wait=False to not block and give chance for the stop check - except OSError as exception: - # 996 0 (0x3E4) - Overlapped I/O event is not in a signaled state - if getattr(exception, "winerror", None) == 996: - sleep(0.01) # sleep for 10ms if there was no data to read and try again - continue - raise - else: - ov = None # reset overlapped IO if the operation was a success - self.handler(data) - def _drain_stream(self) -> bytes: - length, result = 1, b"" - while length: - ov = _overlapped.Overlapped(0) - try: - ov.ReadFile(self.file_no, BUFSIZE) # type: ignore[attr-defined] - data = ov.getresult(False) - except OSError: - length = 0 + def _drain_stream(self) -> None: + wait: Optional[bool] = False + while wait is not True: + wait = self._read_batch() + + def _read_batch(self) -> Optional[bool]: + if self._read is False: + try: # read up to BUFSIZE at a time + self._ov.ReadFile(self.file_no, BUFSIZE) # type: ignore[attr-defined] + self._read = True + except BrokenPipeError: + self.closed = True + return None + try: # wait=False to not block and give chance for the stop check + data = self._ov.getresult(False) + except OSError as exception: + # 996 0 (0x3E4) - Overlapped I/O event is not in a signaled state + if getattr(exception, "winerror", None) == 996: + return True else: - result += data - length = len(data) - return result + logging.error("failed to read %r", exception) + return None + else: + self._read = False + self.handler(data) + return False diff --git a/src/tox/execute/request.py b/src/tox/execute/request.py index d229ca05..e60e80c5 100644 --- a/src/tox/execute/request.py +++ b/src/tox/execute/request.py @@ -36,6 +36,9 @@ class ExecuteRequest: _cmd.extend(self.cmd[1:]) return shell_cmd(_cmd) + def __repr__(self) -> str: + return f"{self.__class__.__name__}(cmd={self.cmd!r}, cwd={self.cwd!r}, env=..., stdin={self.stdin!r})" + def shell_cmd(cmd: Sequence[str]) -> str: if sys.platform == "win32": # pragma: win32 cover diff --git a/src/tox/util/signal.py b/src/tox/util/signal.py new file mode 100644 index 00000000..aff421ea --- /dev/null +++ b/src/tox/util/signal.py @@ -0,0 +1,45 @@ +import logging +import sys +import threading +from signal import Handlers, Signals, signal +from types import FrameType, TracebackType +from typing import Callable, Optional, Type, Union + +if sys.platform == "win32": # pragma: win32 cover + from signal import CTRL_C_EVENT as SIGINT +else: + from signal import SIGINT + + +class DelayedSignal: + def __init__(self, of: Signals = SIGINT) -> None: + self._of = of + self._signal: Optional[Signals] = None + self._frame: Optional[FrameType] = None + self._old_handler: Union[Callable[[Signals, FrameType], None], int, Handlers, None] = None + + def __enter__(self) -> None: + self._signal, self._frame = None, None + if threading.current_thread() == threading.main_thread(): # signals are always handled on the main thread only + self._old_handler = signal(self._of, self._handler) + + def __exit__( + self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] + ) -> None: + try: + if self._signal is not None and self._frame is not None and callable(self._old_handler): + logging.debug("Handling delayed %s", self._signal) + self._old_handler(self._signal, self._frame) + finally: + if self._old_handler is not None: + signal(self._of, self._old_handler) + + def _handler(self, sig: Signals, frame: FrameType) -> None: + logging.debug("Received %s, delaying it", sig) + self._signal, self._frame = sig, frame + + +__all__ = ( + "DelayedSignal", + "SIGINT", +) diff --git a/tests/execute/local_subprocess/bad_process.py b/tests/execute/local_subprocess/bad_process.py index 73441a1e..f31b5297 100644 --- a/tests/execute/local_subprocess/bad_process.py +++ b/tests/execute/local_subprocess/bad_process.py @@ -1,31 +1,34 @@ """This is a non compliant process that does not listens to signals""" # pragma: no cover +import os import signal import sys import time from pathlib import Path -from typing import Any +from types import FrameType +out = sys.stdout -def handler(signum: int, frame: Any) -> None: - print(f"how about no signal {signum}", file=sys.stdout) - sys.stdout.flush() # force output now before we get killed +def handler(signum: signal.Signals, _: FrameType) -> None: + _p(f"how about no signal {signum!r}") + +def _p(m: str) -> None: + out.write(f"{m}{os.linesep}") + out.flush() # force output flush in case we get killed + + +_p(f"start {__name__} with {sys.argv!r}") +signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) -idle_file = Path(sys.argv[1]) -start_file = Path(sys.argv[2]) - -idle_file.write_text("") -time.sleep(float(sys.argv[3])) - -while True: - try: - if not start_file.exists(): - start_file.write_text("") - print(f"created {start_file}") - time.sleep(100) - except KeyboardInterrupt: - print("how about no KeyboardInterrupt", file=sys.stderr) - sys.stderr.flush() # force output now before we get killed +try: + start_file = Path(sys.argv[1]) + _p(f"create {start_file}") + start_file.write_text("") + _p(f"created {start_file}") + while True: + time.sleep(0.01) +finally: + _p(f"done {__name__}") diff --git a/tests/execute/local_subprocess/local_subprocess_sigint.py b/tests/execute/local_subprocess/local_subprocess_sigint.py index 928d41b8..ed0a4ec4 100644 --- a/tests/execute/local_subprocess/local_subprocess_sigint.py +++ b/tests/execute/local_subprocess/local_subprocess_sigint.py @@ -14,21 +14,33 @@ logging.basicConfig(level=logging.DEBUG, format="%(relativeCreated)d\t%(levelnam bad_process = Path(__file__).parent / "bad_process.py" executor = local_sub_process.LocalSubProcessExecutor(colored=False) -local_sub_process.WAIT_GENERAL = 0.05 request = local_sub_process.ExecuteRequest( - cmd=[sys.executable, bad_process, sys.argv[1], sys.argv[2], str(local_sub_process.WAIT_GENERAL * 3)], + cmd=[sys.executable, bad_process, sys.argv[1]], cwd=Path().absolute(), env=os.environ.copy(), stdin=StdinSource.API, ) out_err = TextIOWrapper(NamedBytesIO("out")), TextIOWrapper(NamedBytesIO("err")) -try: - with executor.call(request, show=False, out_err=out_err) as status: - pass -except ToxKeyboardInterrupt as exception: - outcome = exception.outcome + +def show_outcome(outcome): print(outcome.exit_code) print(repr(outcome.out)) print(repr(outcome.err)) print(outcome.elapsed, end="") + + +logging.info("start %r", request) +try: + with executor.call(request, show=False, out_err=out_err) as status: + logging.info("wait on %r", status) + while status.exit_code is None: + status.wait() + logging.info("wait over on %r", status) + show_outcome(status.outcome) +except ToxKeyboardInterrupt as exception: + show_outcome(exception.outcome) +except Exception as exception: + logging.exception(exception) +finally: + logging.info("done") diff --git a/tests/execute/local_subprocess/test_local_subprocess.py b/tests/execute/local_subprocess/test_local_subprocess.py index d8c51107..6699c0b6 100644 --- a/tests/execute/local_subprocess/test_local_subprocess.py +++ b/tests/execute/local_subprocess/test_local_subprocess.py @@ -12,11 +12,12 @@ import pytest from colorama import Fore from pytest_mock import MockerFixture -from tox.execute.api import SIGINT, Outcome +from tox.execute.api import Outcome from tox.execute.local_sub_process import CREATION_FLAGS, LocalSubProcessExecutor from tox.execute.request import ExecuteRequest, StdinSource from tox.pytest import CaptureFixture, LogCaptureFixture, MonkeyPatch from tox.report import NamedBytesIO +from tox.util.signal import SIGINT class FakeOutErr: @@ -205,45 +206,37 @@ def test_command_does_not_exist(capsys: CaptureFixture, caplog: LogCaptureFixtur @pytest.mark.skipif(sys.platform == "win32", reason="TODO: find out why it does not work") -def test_command_keyboard_interrupt(tmp_path: Path) -> None: - send_signal = tmp_path / "send" - process = subprocess.Popen( - [ - sys.executable, - str(Path(__file__).parent / "local_subprocess_sigint.py"), - str(tmp_path / "idle"), - str(send_signal), - ], - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - universal_newlines=True, - creationflags=CREATION_FLAGS, - ) - while not send_signal.exists(): +@pytest.mark.timeout(100) +def test_command_keyboard_interrupt(tmp_path: Path, monkeypatch: MonkeyPatch, capfd: CaptureFixture) -> None: + monkeypatch.chdir(tmp_path) + process_up_signal = tmp_path / "signal" + cmd = [sys.executable, str(Path(__file__).parent / "local_subprocess_sigint.py"), str(process_up_signal)] + process = subprocess.Popen(cmd, creationflags=CREATION_FLAGS) + while not process_up_signal.exists(): assert process.poll() is None - root = process.pid + child = next(iter(psutil.Process(pid=root).children())).pid process.send_signal(SIGINT) try: - out, err = process.communicate(timeout=None) + process.communicate(timeout=None) except subprocess.TimeoutExpired: # pragma: no cover process.kill() - out, err = process.communicate() - assert False, f"{out}\n{err}" + raise - assert "E\tgot KeyboardInterrupt signal" in err, err - assert f"W\tKeyboardInterrupt from {root} SIGINT pid {child}" in err, err - assert f"W\tKeyboardInterrupt from {root} SIGTERM pid {child}" in err, err - assert f"I\tKeyboardInterrupt from {root} SIGKILL pid {child}" in err, err + out, err = capfd.readouterr() + assert "E got KeyboardInterrupt signal" in err, err + assert f"W KeyboardInterrupt from {root} SIGINT pid {child}" in err, err + assert f"W KeyboardInterrupt from {root} SIGTERM pid {child}" in err, err + assert f"I KeyboardInterrupt from {root} SIGKILL pid {child}" in err, err outs = out.split("\n") exit_code = int(outs[0]) assert exit_code == -9 assert float(outs[3]) > 0 # duration - assert "how about no signal 15" in outs[1], outs[1] # stdout - assert "how about no KeyboardInterrupt" in outs[2], outs[2] # stderr + assert "how about no signal 2" in outs[1], outs[1] # 2 - Interrupt + assert "how about no signal 15" in outs[1], outs[1] # 15 - Terminated @pytest.mark.parametrize("tty_mode", ["on", "off"]) |