summaryrefslogtreecommitdiff
path: root/src/tox/execute
diff options
context:
space:
mode:
authorBernát Gábor <bgabor8@bloomberg.net>2021-08-15 10:07:45 +0100
committerGitHub <noreply@github.com>2021-08-15 10:07:45 +0100
commit78176651933ef954111a7e392b6914df938b7c8b (patch)
tree7967e04d15d948a71c9fe7bc90cfd5a073259548 /src/tox/execute
parent6bb9491311b9bfa0602d56e68b1448b94f71b4a1 (diff)
downloadtox-git-78176651933ef954111a7e392b6914df938b7c8b.tar.gz
Add support for execute interrupt timeouts (#2158)
Diffstat (limited to 'src/tox/execute')
-rw-r--r--src/tox/execute/api.py64
-rw-r--r--src/tox/execute/local_sub_process/__init__.py101
-rw-r--r--src/tox/execute/pep517_backend.py17
3 files changed, 106 insertions, 76 deletions
diff --git a/src/tox/execute/api.py b/src/tox/execute/api.py
index 4aa880fd..f757eb60 100644
--- a/src/tox/execute/api.py
+++ b/src/tox/execute/api.py
@@ -7,7 +7,7 @@ import time
from abc import ABC, abstractmethod
from contextlib import contextmanager
from types import TracebackType
-from typing import Any, Callable, Dict, Iterator, NoReturn, Optional, Sequence, Tuple, Type
+from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, NoReturn, Optional, Sequence, Tuple, Type, cast
from colorama import Fore
@@ -16,14 +16,56 @@ from tox.report import OutErr
from .request import ExecuteRequest, StdinSource
from .stream import SyncWrite
+if TYPE_CHECKING:
+ from tox.tox_env.api import ToxEnv
+
ContentHandler = Callable[[bytes], None]
Executor = Callable[[ExecuteRequest, ContentHandler, ContentHandler], int]
LOGGER = logging.getLogger(__name__)
+class ExecuteOptions:
+ def __init__(self, env: "ToxEnv") -> None:
+ self._env = env
+
+ @classmethod
+ def register_conf(cls, env: "ToxEnv") -> None: # noqa
+ env.conf.add_config(
+ keys=["suicide_timeout"],
+ desc="timeout to allow process to exit before sending SIGINT",
+ of_type=float,
+ default=0.0,
+ )
+ env.conf.add_config(
+ keys=["interrupt_timeout"],
+ desc="timeout before sending SIGTERM after SIGINT",
+ of_type=float,
+ default=0.3,
+ )
+ env.conf.add_config(
+ keys=["terminate_timeout"],
+ desc="timeout before sending SIGKILL after SIGTERM",
+ of_type=float,
+ default=0.2,
+ )
+
+ @property
+ def suicide_timeout(self) -> float:
+ return cast(float, self._env.conf["suicide_timeout"])
+
+ @property
+ def interrupt_timeout(self) -> float:
+ return cast(float, self._env.conf["interrupt_timeout"])
+
+ @property
+ def terminate_timeout(self) -> float:
+ return cast(float, self._env.conf["terminate_timeout"])
+
+
class ExecuteStatus(ABC):
- def __init__(self, out: SyncWrite, err: SyncWrite) -> None:
+ def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite) -> None:
self.outcome: Optional[Outcome] = None
+ self.options = options
self._out = out
self._err = err
@@ -33,7 +75,7 @@ class ExecuteStatus(ABC):
raise NotImplementedError
@abstractmethod
- def wait(self, timeout: Optional[float] = None) -> None: # noqa: U100
+ def wait(self, timeout: Optional[float] = None) -> Optional[int]: # noqa: U100
raise NotImplementedError
@abstractmethod
@@ -65,11 +107,13 @@ class ExecuteStatus(ABC):
class Execute(ABC):
"""Abstract API for execution of a tox environment"""
+ _option_class: Type[ExecuteOptions] = ExecuteOptions
+
def __init__(self, colored: bool) -> None:
self._colored = colored
@contextmanager
- def call(self, request: ExecuteRequest, show: bool, out_err: OutErr) -> Iterator[ExecuteStatus]:
+ def call(self, request: ExecuteRequest, show: bool, out_err: OutErr, env: "ToxEnv") -> Iterator[ExecuteStatus]:
start = time.monotonic()
try:
# collector is what forwards the content from the file streams to the standard streams
@@ -77,7 +121,7 @@ class Execute(ABC):
out_sync = SyncWrite(out.name, out if show else None)
err_sync = SyncWrite(err.name, err if show else None, Fore.RED if self._colored else None)
with out_sync, err_sync:
- instance = self.build_instance(request, out_sync, err_sync)
+ instance = self.build_instance(request, self._option_class(env), out_sync, err_sync)
with instance as status:
yield status
exit_code = status.exit_code
@@ -89,16 +133,21 @@ class Execute(ABC):
@abstractmethod
def build_instance(
- self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite # noqa: U100
+ self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite # noqa: U100
) -> "ExecuteInstance":
raise NotImplementedError
+ @classmethod
+ def register_conf(cls, env: "ToxEnv") -> None:
+ cls._option_class.register_conf(env)
+
class ExecuteInstance(ABC):
"""An instance of a command execution"""
- def __init__(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> None:
+ def __init__(self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite) -> None:
self.request = request
+ self.options = options
self._out = out
self._err = err
@@ -234,6 +283,7 @@ __all__ = (
"Outcome",
"Execute",
"ExecuteInstance",
+ "ExecuteOptions",
"ExecuteStatus",
"StdinSource",
)
diff --git a/src/tox/execute/local_sub_process/__init__.py b/src/tox/execute/local_sub_process/__init__.py
index 7b8177b3..ffbeddfe 100644
--- a/src/tox/execute/local_sub_process/__init__.py
+++ b/src/tox/execute/local_sub_process/__init__.py
@@ -4,17 +4,15 @@ import logging
import os
import shutil
import sys
-import time
from subprocess import DEVNULL, PIPE, TimeoutExpired
from types import TracebackType
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Sequence, Tuple, Type
from tox.tox_env.errors import Fail
-from ..api import Execute, ExecuteInstance, ExecuteStatus
+from ..api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus
from ..request import ExecuteRequest, StdinSource
from ..stream import SyncWrite
-from .read_via_thread import WAIT_GENERAL
if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover
# needs stdin/stdout handlers backed by overlapped IO
@@ -23,6 +21,7 @@ if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover
else:
from asyncio.windows_utils import Popen
from signal import CTRL_C_EVENT as SIG_INTERRUPT
+ from signal import SIGTERM
from subprocess import CREATE_NEW_PROCESS_GROUP
from .read_via_thread_windows import ReadViaThreadWindows as ReadViaThread
@@ -37,20 +36,21 @@ else: # pragma: win32 no cover
CREATION_FLAGS = 0
-WAIT_INTERRUPT = 0.3
-WAIT_TERMINATE = 0.2
+
IS_WIN = sys.platform == "win32"
class LocalSubProcessExecutor(Execute):
- def build_instance(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> ExecuteInstance:
- return LocalSubProcessExecuteInstance(request, out, err)
+ def build_instance(
+ self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite
+ ) -> ExecuteInstance:
+ return LocalSubProcessExecuteInstance(request, options, out, err)
class LocalSubprocessExecuteStatus(ExecuteStatus):
- def __init__(self, out: SyncWrite, err: SyncWrite, process: "Popen[bytes]"):
+ def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, process: "Popen[bytes]"):
self._process: "Popen[bytes]" = process
- super().__init__(out, err)
+ super().__init__(options, out, err)
self._interrupted = False
@property
@@ -62,54 +62,30 @@ class LocalSubprocessExecuteStatus(ExecuteStatus):
if self._process is not None: # pragma: no branch
# A three level stop mechanism for children - INT -> TERM -> KILL
# communicate will wait for the app to stop, and then drain the standard streams and close them
- proc = self._process
- host_pid = os.getpid()
- to_pid = proc.pid
- logging.warning("requested interrupt of %d from %d", to_pid, host_pid)
- if proc.poll() is None: # still alive, first INT
- logging.warning(
- "send signal %s to %d from %d with timeout %.2f",
- f"SIGINT({SIG_INTERRUPT})",
- to_pid,
- host_pid,
- WAIT_INTERRUPT,
- )
- proc.send_signal(SIG_INTERRUPT)
- start = time.monotonic()
- while proc.poll() is None and (time.monotonic() - start) < WAIT_INTERRUPT:
- continue
- if proc.poll() is None: # pragma: no branch
- if sys.platform == "win32": # explicit check for mypy # pragma: no branch
- logging.warning("terminate %d from %d", to_pid, host_pid) # pragma: no cover
- else:
- logging.warning(
- "send signal %s to %d from %d with timeout %.2f",
- f"SIGTERM({SIGTERM})",
- to_pid,
- host_pid,
- WAIT_TERMINATE,
- )
- proc.terminate()
- start = time.monotonic()
- if sys.platform != "win32": # explicit check for mypy # pragma: no branch
- # Windows terminate is UNIX kill
- while proc.poll() is None and (time.monotonic() - start) < WAIT_TERMINATE:
- continue
- if proc.poll() is None: # pragma: no branch
- logging.warning("send signal %s to %d from %d", f"SIGKILL({SIGKILL})", to_pid, host_pid)
- proc.kill()
- while proc.poll() is None:
- continue # pragma: no cover
+ to_pid, host_pid = self._process.pid, os.getpid()
+ msg = "requested interrupt of %d from %d, activate in %.2f"
+ logging.warning(msg, to_pid, host_pid, self.options.suicide_timeout)
+ if self.wait(self.options.suicide_timeout) is None: # still alive -> INT
+ msg = "send signal %s to %d from %d with timeout %.2f"
+ logging.warning(msg, f"SIGINT({SIG_INTERRUPT})", to_pid, host_pid, self.options.interrupt_timeout)
+ self._process.send_signal(SIG_INTERRUPT)
+ if self.wait(self.options.interrupt_timeout) is None: # still alive -> TERM # pragma: no branch
+ logging.warning(msg, f"SIGTERM({SIGTERM})", to_pid, host_pid, self.options.terminate_timeout)
+ self._process.terminate()
+ if sys.platform != "win32": # Windows terminate is UNIX kill
+ if self.wait(self.options.terminate_timeout) is None: # still alive -> KILL
+ logging.warning(msg[:-18], f"SIGKILL({SIGKILL})", to_pid, host_pid)
+ self._process.kill()
+ self.wait() # unconditional wait as kill should soon bring down the process
+ logging.warning("interrupt finished with success")
else: # pragma: no cover # difficult to test, process must die just as it's being interrupted
- logging.warning("process already dead with %s within %s", proc.returncode, os.getpid())
- logging.warning("interrupt finished with success")
+ logging.warning("process already dead with %s within %s", self._process.returncode, host_pid)
- def wait(self, timeout: Optional[float] = None) -> None:
- # note poll in general might deadlock if output large, but we drain in background threads so not an issue here
- try:
- self._process.wait(timeout=WAIT_GENERAL if timeout is None else timeout)
+ def wait(self, timeout: Optional[float] = None) -> Optional[int]:
+ try: # note wait in general might deadlock if output large, but we drain in background threads so not an issue
+ return self._process.wait(timeout=timeout)
except TimeoutExpired:
- pass
+ return None
def write_stdin(self, content: str) -> None:
stdin = self._process.stdin
@@ -143,33 +119,34 @@ class LocalSubprocessExecuteStatus(ExecuteStatus):
class LocalSubprocessExecuteFailedStatus(ExecuteStatus):
- def __init__(self, out: SyncWrite, err: SyncWrite, exit_code: Optional[int]) -> None:
- super().__init__(out, err)
+ def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, exit_code: Optional[int]) -> None:
+ super().__init__(options, out, err)
self._exit_code = exit_code
@property
def exit_code(self) -> Optional[int]:
return self._exit_code
- def wait(self, timeout: Optional[float] = None) -> None: # noqa: U100
- """already dead no need to wait"""
+ def wait(self, timeout: Optional[float] = None) -> Optional[int]: # noqa: U100
+ return self._exit_code # pragma: no cover
def write_stdin(self, content: str) -> None: # noqa: U100
"""cannot write"""
def interrupt(self) -> None:
- """Nothing running so nothing to interrupt"""
+ return None # pragma: no cover # nothing running so nothing to interrupt
class LocalSubProcessExecuteInstance(ExecuteInstance):
def __init__(
self,
request: ExecuteRequest,
+ options: ExecuteOptions,
out: SyncWrite,
err: SyncWrite,
on_exit_drain: bool = True,
) -> None:
- super().__init__(request, out, err)
+ super().__init__(request, options, out, err)
self.process: Optional[Popen[bytes]] = None
self._cmd: Optional[List[str]] = None
self._read_stderr: Optional[ReadViaThread] = None
@@ -218,9 +195,9 @@ class LocalSubProcessExecuteInstance(ExecuteInstance):
creationflags=CREATION_FLAGS,
)
except OSError as exception:
- return LocalSubprocessExecuteFailedStatus(self._out, self._err, exception.errno)
+ return LocalSubprocessExecuteFailedStatus(self.options, self._out, self._err, exception.errno)
- status = LocalSubprocessExecuteStatus(self._out, self._err, process)
+ status = LocalSubprocessExecuteStatus(self.options, self._out, self._err, process)
drain, pid = self._on_exit_drain, self.process.pid
self._read_stderr = ReadViaThread(stderr.send(process), self.err_handler, name=f"err-{pid}", drain=drain)
self._read_stderr.__enter__()
diff --git a/src/tox/execute/pep517_backend.py b/src/tox/execute/pep517_backend.py
index d865bfaf..4bd26b87 100644
--- a/src/tox/execute/pep517_backend.py
+++ b/src/tox/execute/pep517_backend.py
@@ -7,7 +7,7 @@ from types import TracebackType
from typing import Dict, Optional, Sequence, Tuple, Type
from tox.execute import ExecuteRequest
-from tox.execute.api import Execute, ExecuteInstance, ExecuteStatus
+from tox.execute.api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus
from tox.execute.local_sub_process import LocalSubProcessExecuteInstance
from tox.execute.request import StdinSource
from tox.execute.stream import SyncWrite
@@ -26,19 +26,21 @@ class LocalSubProcessPep517Executor(Execute):
self._exc: Optional[Exception] = None
self.is_alive: bool = False
- def build_instance(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> ExecuteInstance:
- result = LocalSubProcessPep517ExecuteInstance(request, out, err, self.local_execute)
+ def build_instance(
+ self, request: ExecuteRequest, options: ExecuteOptions, out: SyncWrite, err: SyncWrite
+ ) -> ExecuteInstance:
+ result = LocalSubProcessPep517ExecuteInstance(request, options, out, err, self.local_execute(options))
return result
- @property
- def local_execute(self) -> Tuple[LocalSubProcessExecuteInstance, ExecuteStatus]:
+ def local_execute(self, options: ExecuteOptions) -> Tuple[LocalSubProcessExecuteInstance, ExecuteStatus]:
if self._exc is not None:
raise self._exc
if self._local_execute is None:
request = ExecuteRequest(cmd=self.cmd, cwd=self.cwd, env=self.env, stdin=StdinSource.API, run_id="pep517")
instance = LocalSubProcessExecuteInstance(
- request,
+ request=request,
+ options=options,
out=SyncWrite(name="pep517-out", target=None, color=None), # not enabled no need to enter/exit
err=SyncWrite(name="pep517-err", target=None, color=None), # not enabled no need to enter/exit
on_exit_drain=False,
@@ -89,11 +91,12 @@ class LocalSubProcessPep517ExecuteInstance(ExecuteInstance):
def __init__(
self,
request: ExecuteRequest,
+ options: ExecuteOptions,
out: SyncWrite,
err: SyncWrite,
instance_status: Tuple[LocalSubProcessExecuteInstance, ExecuteStatus],
):
- super().__init__(request, out, err)
+ super().__init__(request, options, out, err)
self._instance, self._status = instance_status
self._lock = Lock()