diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/tox/execute/api.py | 64 | ||||
-rw-r--r-- | src/tox/execute/local_sub_process/__init__.py | 101 | ||||
-rw-r--r-- | src/tox/execute/pep517_backend.py | 17 | ||||
-rw-r--r-- | src/tox/pytest.py | 28 | ||||
-rw-r--r-- | src/tox/tox_env/api.py | 6 |
5 files changed, 126 insertions, 90 deletions
diff --git a/src/tox/execute/api.py b/src/tox/execute/api.py index 4aa880fd..f757eb60 100644 --- a/src/tox/execute/api.py +++ b/src/tox/execute/api.py @@ -7,7 +7,7 @@ import time from abc import ABC, abstractmethod from contextlib import contextmanager from types import TracebackType -from typing import Any, Callable, Dict, Iterator, NoReturn, Optional, Sequence, Tuple, Type +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, NoReturn, Optional, Sequence, Tuple, Type, cast from colorama import Fore @@ -16,14 +16,56 @@ from tox.report import OutErr from .request import ExecuteRequest, StdinSource from .stream import SyncWrite +if TYPE_CHECKING: + from tox.tox_env.api import ToxEnv + ContentHandler = Callable[[bytes], None] Executor = Callable[[ExecuteRequest, ContentHandler, ContentHandler], int] LOGGER = logging.getLogger(__name__) +class ExecuteOptions: + def __init__(self, env: "ToxEnv") -> None: + self._env = env + + @classmethod + def register_conf(cls, env: "ToxEnv") -> None: # noqa + env.conf.add_config( + keys=["suicide_timeout"], + desc="timeout to allow process to exit before sending SIGINT", + of_type=float, + default=0.0, + ) + env.conf.add_config( + keys=["interrupt_timeout"], + desc="timeout before sending SIGTERM after SIGINT", + of_type=float, + default=0.3, + ) + env.conf.add_config( + keys=["terminate_timeout"], + desc="timeout before sending SIGKILL after SIGTERM", + of_type=float, + default=0.2, + ) + + @property + def suicide_timeout(self) -> float: + return cast(float, self._env.conf["suicide_timeout"]) + + @property + def interrupt_timeout(self) -> float: + return cast(float, self._env.conf["interrupt_timeout"]) + + @property + def terminate_timeout(self) -> float: + return cast(float, self._env.conf["terminate_timeout"]) + + class ExecuteStatus(ABC): - def __init__(self, out: SyncWrite, err: SyncWrite) -> None: + def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite) -> None: self.outcome: Optional[Outcome] = None + self.options = options self._out = out self._err = err @@ -33,7 +75,7 @@ class ExecuteStatus(ABC): raise NotImplementedError @abstractmethod - def wait(self, timeout: Optional[float] = None) -> None: # noqa: U100 + def wait(self, timeout: Optional[float] = None) -> Optional[int]: # noqa: U100 raise NotImplementedError @abstractmethod @@ -65,11 +107,13 @@ class ExecuteStatus(ABC): class Execute(ABC): """Abstract API for execution of a tox environment""" + _option_class: Type[ExecuteOptions] = ExecuteOptions + def __init__(self, colored: bool) -> None: self._colored = colored @contextmanager - def call(self, request: ExecuteRequest, show: bool, out_err: OutErr) -> Iterator[ExecuteStatus]: + def call(self, request: ExecuteRequest, show: bool, out_err: OutErr, env: "ToxEnv") -> Iterator[ExecuteStatus]: start = time.monotonic() try: # collector is what forwards the content from the file streams to the standard streams @@ -77,7 +121,7 @@ class Execute(ABC): out_sync = SyncWrite(out.name, out if show else None) err_sync = SyncWrite(err.name, err if show else None, Fore.RED if self._colored else None) with out_sync, err_sync: - instance = self.build_instance(request, out_sync, err_sync) + instance = self.build_instance(request, self._option_class(env), out_sync, err_sync) with instance as status: yield status exit_code = status.exit_code @@ -89,16 +133,21 @@ class Execute(ABC): @abstractmethod def build_instance( - self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite # noqa: U100 + self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite # noqa: U100 ) -> "ExecuteInstance": raise NotImplementedError + @classmethod + def register_conf(cls, env: "ToxEnv") -> None: + cls._option_class.register_conf(env) + class ExecuteInstance(ABC): """An instance of a command execution""" - def __init__(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> None: + def __init__(self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite) -> None: self.request = request + self.options = options self._out = out self._err = err @@ -234,6 +283,7 @@ __all__ = ( "Outcome", "Execute", "ExecuteInstance", + "ExecuteOptions", "ExecuteStatus", "StdinSource", ) diff --git a/src/tox/execute/local_sub_process/__init__.py b/src/tox/execute/local_sub_process/__init__.py index 7b8177b3..ffbeddfe 100644 --- a/src/tox/execute/local_sub_process/__init__.py +++ b/src/tox/execute/local_sub_process/__init__.py @@ -4,17 +4,15 @@ import logging import os import shutil import sys -import time from subprocess import DEVNULL, PIPE, TimeoutExpired from types import TracebackType from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Sequence, Tuple, Type from tox.tox_env.errors import Fail -from ..api import Execute, ExecuteInstance, ExecuteStatus +from ..api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus from ..request import ExecuteRequest, StdinSource from ..stream import SyncWrite -from .read_via_thread import WAIT_GENERAL if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover # needs stdin/stdout handlers backed by overlapped IO @@ -23,6 +21,7 @@ if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover else: from asyncio.windows_utils import Popen from signal import CTRL_C_EVENT as SIG_INTERRUPT + from signal import SIGTERM from subprocess import CREATE_NEW_PROCESS_GROUP from .read_via_thread_windows import ReadViaThreadWindows as ReadViaThread @@ -37,20 +36,21 @@ else: # pragma: win32 no cover CREATION_FLAGS = 0 -WAIT_INTERRUPT = 0.3 -WAIT_TERMINATE = 0.2 + IS_WIN = sys.platform == "win32" class LocalSubProcessExecutor(Execute): - def build_instance(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> ExecuteInstance: - return LocalSubProcessExecuteInstance(request, out, err) + def build_instance( + self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite + ) -> ExecuteInstance: + return LocalSubProcessExecuteInstance(request, options, out, err) class LocalSubprocessExecuteStatus(ExecuteStatus): - def __init__(self, out: SyncWrite, err: SyncWrite, process: "Popen[bytes]"): + def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, process: "Popen[bytes]"): self._process: "Popen[bytes]" = process - super().__init__(out, err) + super().__init__(options, out, err) self._interrupted = False @property @@ -62,54 +62,30 @@ class LocalSubprocessExecuteStatus(ExecuteStatus): if self._process is not None: # pragma: no branch # A three level stop mechanism for children - INT -> TERM -> KILL # communicate will wait for the app to stop, and then drain the standard streams and close them - proc = self._process - host_pid = os.getpid() - to_pid = proc.pid - logging.warning("requested interrupt of %d from %d", to_pid, host_pid) - if proc.poll() is None: # still alive, first INT - logging.warning( - "send signal %s to %d from %d with timeout %.2f", - f"SIGINT({SIG_INTERRUPT})", - to_pid, - host_pid, - WAIT_INTERRUPT, - ) - proc.send_signal(SIG_INTERRUPT) - start = time.monotonic() - while proc.poll() is None and (time.monotonic() - start) < WAIT_INTERRUPT: - continue - if proc.poll() is None: # pragma: no branch - if sys.platform == "win32": # explicit check for mypy # pragma: no branch - logging.warning("terminate %d from %d", to_pid, host_pid) # pragma: no cover - else: - logging.warning( - "send signal %s to %d from %d with timeout %.2f", - f"SIGTERM({SIGTERM})", - to_pid, - host_pid, - WAIT_TERMINATE, - ) - proc.terminate() - start = time.monotonic() - if sys.platform != "win32": # explicit check for mypy # pragma: no branch - # Windows terminate is UNIX kill - while proc.poll() is None and (time.monotonic() - start) < WAIT_TERMINATE: - continue - if proc.poll() is None: # pragma: no branch - logging.warning("send signal %s to %d from %d", f"SIGKILL({SIGKILL})", to_pid, host_pid) - proc.kill() - while proc.poll() is None: - continue # pragma: no cover + to_pid, host_pid = self._process.pid, os.getpid() + msg = "requested interrupt of %d from %d, activate in %.2f" + logging.warning(msg, to_pid, host_pid, self.options.suicide_timeout) + if self.wait(self.options.suicide_timeout) is None: # still alive -> INT + msg = "send signal %s to %d from %d with timeout %.2f" + logging.warning(msg, f"SIGINT({SIG_INTERRUPT})", to_pid, host_pid, self.options.interrupt_timeout) + self._process.send_signal(SIG_INTERRUPT) + if self.wait(self.options.interrupt_timeout) is None: # still alive -> TERM # pragma: no branch + logging.warning(msg, f"SIGTERM({SIGTERM})", to_pid, host_pid, self.options.terminate_timeout) + self._process.terminate() + if sys.platform != "win32": # Windows terminate is UNIX kill + if self.wait(self.options.terminate_timeout) is None: # still alive -> KILL + logging.warning(msg[:-18], f"SIGKILL({SIGKILL})", to_pid, host_pid) + self._process.kill() + self.wait() # unconditional wait as kill should soon bring down the process + logging.warning("interrupt finished with success") else: # pragma: no cover # difficult to test, process must die just as it's being interrupted - logging.warning("process already dead with %s within %s", proc.returncode, os.getpid()) - logging.warning("interrupt finished with success") + logging.warning("process already dead with %s within %s", self._process.returncode, host_pid) - def wait(self, timeout: Optional[float] = None) -> None: - # note poll in general might deadlock if output large, but we drain in background threads so not an issue here - try: - self._process.wait(timeout=WAIT_GENERAL if timeout is None else timeout) + def wait(self, timeout: Optional[float] = None) -> Optional[int]: + try: # note wait in general might deadlock if output large, but we drain in background threads so not an issue + return self._process.wait(timeout=timeout) except TimeoutExpired: - pass + return None def write_stdin(self, content: str) -> None: stdin = self._process.stdin @@ -143,33 +119,34 @@ class LocalSubprocessExecuteStatus(ExecuteStatus): class LocalSubprocessExecuteFailedStatus(ExecuteStatus): - def __init__(self, out: SyncWrite, err: SyncWrite, exit_code: Optional[int]) -> None: - super().__init__(out, err) + def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, exit_code: Optional[int]) -> None: + super().__init__(options, out, err) self._exit_code = exit_code @property def exit_code(self) -> Optional[int]: return self._exit_code - def wait(self, timeout: Optional[float] = None) -> None: # noqa: U100 - """already dead no need to wait""" + def wait(self, timeout: Optional[float] = None) -> Optional[int]: # noqa: U100 + return self._exit_code # pragma: no cover def write_stdin(self, content: str) -> None: # noqa: U100 """cannot write""" def interrupt(self) -> None: - """Nothing running so nothing to interrupt""" + return None # pragma: no cover # nothing running so nothing to interrupt class LocalSubProcessExecuteInstance(ExecuteInstance): def __init__( self, request: ExecuteRequest, + options: ExecuteOptions, out: SyncWrite, err: SyncWrite, on_exit_drain: bool = True, ) -> None: - super().__init__(request, out, err) + super().__init__(request, options, out, err) self.process: Optional[Popen[bytes]] = None self._cmd: Optional[List[str]] = None self._read_stderr: Optional[ReadViaThread] = None @@ -218,9 +195,9 @@ class LocalSubProcessExecuteInstance(ExecuteInstance): creationflags=CREATION_FLAGS, ) except OSError as exception: - return LocalSubprocessExecuteFailedStatus(self._out, self._err, exception.errno) + return LocalSubprocessExecuteFailedStatus(self.options, self._out, self._err, exception.errno) - status = LocalSubprocessExecuteStatus(self._out, self._err, process) + status = LocalSubprocessExecuteStatus(self.options, self._out, self._err, process) drain, pid = self._on_exit_drain, self.process.pid self._read_stderr = ReadViaThread(stderr.send(process), self.err_handler, name=f"err-{pid}", drain=drain) self._read_stderr.__enter__() diff --git a/src/tox/execute/pep517_backend.py b/src/tox/execute/pep517_backend.py index d865bfaf..4bd26b87 100644 --- a/src/tox/execute/pep517_backend.py +++ b/src/tox/execute/pep517_backend.py @@ -7,7 +7,7 @@ from types import TracebackType from typing import Dict, Optional, Sequence, Tuple, Type from tox.execute import ExecuteRequest -from tox.execute.api import Execute, ExecuteInstance, ExecuteStatus +from tox.execute.api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus from tox.execute.local_sub_process import LocalSubProcessExecuteInstance from tox.execute.request import StdinSource from tox.execute.stream import SyncWrite @@ -26,19 +26,21 @@ class LocalSubProcessPep517Executor(Execute): self._exc: Optional[Exception] = None self.is_alive: bool = False - def build_instance(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> ExecuteInstance: - result = LocalSubProcessPep517ExecuteInstance(request, out, err, self.local_execute) + def build_instance( + self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite + ) -> ExecuteInstance: + result = LocalSubProcessPep517ExecuteInstance(request, options, out, err, self.local_execute(options)) return result - @property - def local_execute(self) -> Tuple[LocalSubProcessExecuteInstance, ExecuteStatus]: + def local_execute(self, options: ExecuteOptions) -> Tuple[LocalSubProcessExecuteInstance, ExecuteStatus]: if self._exc is not None: raise self._exc if self._local_execute is None: request = ExecuteRequest(cmd=self.cmd, cwd=self.cwd, env=self.env, stdin=StdinSource.API, run_id="pep517") instance = LocalSubProcessExecuteInstance( - request, + request=request, + options=options, out=SyncWrite(name="pep517-out", target=None, color=None), # not enabled no need to enter/exit err=SyncWrite(name="pep517-err", target=None, color=None), # not enabled no need to enter/exit on_exit_drain=False, @@ -89,11 +91,12 @@ class LocalSubProcessPep517ExecuteInstance(ExecuteInstance): def __init__( self, request: ExecuteRequest, + options: ExecuteOptions, out: SyncWrite, err: SyncWrite, instance_status: Tuple[LocalSubProcessExecuteInstance, ExecuteStatus], ): - super().__init__(request, out, err) + super().__init__(request, options, out, err) self._instance, self._status = instance_status self._lock = Lock() diff --git a/src/tox/pytest.py b/src/tox/pytest.py index d50dc40e..afea6bee 100644 --- a/src/tox/pytest.py +++ b/src/tox/pytest.py @@ -34,7 +34,7 @@ from virtualenv.info import IS_WIN, fs_supports_symlink import tox.run from tox.config.sets import EnvConfigSet -from tox.execute.api import Execute, ExecuteInstance, ExecuteStatus, Outcome +from tox.execute.api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus, Outcome from tox.execute.request import ExecuteRequest, shell_cmd from tox.execute.stream import SyncWrite from tox.report import LOGGER, OutErr @@ -160,34 +160,38 @@ class ToxProject: self.exit_code = exit_code super().__init__(colored) - def build_instance(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> ExecuteInstance: - return MockExecuteInstance(request, out, err, self.exit_code) + def build_instance( + self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite + ) -> ExecuteInstance: + return MockExecuteInstance(request, options, out, err, self.exit_code) class MockExecuteStatus(ExecuteStatus): - def __init__(self, out: SyncWrite, err: SyncWrite, exit_code: int) -> None: - super().__init__(out, err) + def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, exit_code: int) -> None: + super().__init__(options, out, err) self._exit_code = exit_code @property def exit_code(self) -> Optional[int]: return self._exit_code - def wait(self, timeout: Optional[float] = None) -> None: - """ """ + def wait(self, timeout: Optional[float] = None) -> Optional[int]: + return self._exit_code def write_stdin(self, content: str) -> None: - """ """ + return None # pragma: no cover def interrupt(self) -> None: - """ """ + return None # pragma: no cover class MockExecuteInstance(ExecuteInstance): - def __init__(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite, exit_code: int) -> None: - super().__init__(request, out, err) + def __init__( + self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, exit_code: int + ) -> None: + super().__init__(request, options, out, err) self.exit_code = exit_code def __enter__(self) -> ExecuteStatus: - return MockExecuteStatus(self._out, self._err, self.exit_code) + return MockExecuteStatus(self.options, self._out, self._err, self.exit_code) def __exit__( self, diff --git a/src/tox/tox_env/api.py b/src/tox/tox_env/api.py index 1ac32bdd..031acd77 100644 --- a/src/tox/tox_env/api.py +++ b/src/tox/tox_env/api.py @@ -106,6 +106,7 @@ class ToxEnv(ABC): default=lambda conf, name: cast(Path, conf.core["work_dir"]) / self.name / "log", desc="a folder for logging where tox will put logs of tool invocation", ) + self.executor.register_conf(self) self.conf.default_set_env_loader = self._default_set_env self.conf.add_config( keys=["platform"], @@ -337,8 +338,8 @@ class ToxEnv(ABC): executor: Optional[Execute] = None, ) -> Outcome: with self.execute_async(cmd, stdin, show, cwd, run_id, executor) as status: - while status.exit_code is None: - status.wait() + while status.wait() is None: + pass # pragma: no cover if status.outcome is None: # pragma: no cover # this should not happen raise RuntimeError # pragma: no cover return status.outcome @@ -427,6 +428,7 @@ class ToxEnv(ABC): ) -> Iterator[ExecuteStatus]: with executor.call( request=request, + env=self, show=show, out_err=out_err, ) as execute_status: |