""" Abstract base API for executing commands within tox environments. """ import logging import sys import time from abc import ABC, abstractmethod from contextlib import contextmanager from types import TracebackType from typing import Any, Callable, Dict, Iterator, NoReturn, Optional, Sequence, Tuple, Type from colorama import Fore from tox.report import OutErr from .request import ExecuteRequest, StdinSource from .stream import SyncWrite ContentHandler = Callable[[bytes], None] Executor = Callable[[ExecuteRequest, ContentHandler, ContentHandler], int] LOGGER = logging.getLogger(__name__) class ExecuteStatus(ABC): def __init__(self, out: SyncWrite, err: SyncWrite) -> None: self.outcome: Optional[Outcome] = None self._out = out self._err = err @property @abstractmethod def exit_code(self) -> Optional[int]: raise NotImplementedError @abstractmethod def wait(self, timeout: Optional[float] = None) -> None: # noqa: U100 raise NotImplementedError @abstractmethod def write_stdin(self, content: str) -> None: # noqa: U100 raise NotImplementedError @abstractmethod def interrupt(self) -> None: raise NotImplementedError def set_out_err(self, out: SyncWrite, err: SyncWrite) -> Tuple[SyncWrite, SyncWrite]: res = self._out, self._err self._out, self._err = out, err return res @property def out(self) -> bytearray: return self._out.content @property def err(self) -> bytearray: return self._err.content @property def metadata(self) -> Dict[str, Any]: return {} class Execute(ABC): """Abstract API for execution of a tox environment""" def __init__(self, colored: bool) -> None: self._colored = colored @contextmanager def call(self, request: ExecuteRequest, show: bool, out_err: OutErr) -> Iterator[ExecuteStatus]: start = time.monotonic() try: # collector is what forwards the content from the file streams to the standard streams out, err = out_err[0].buffer, out_err[1].buffer out_sync = SyncWrite(out.name, out if show else None) err_sync = SyncWrite(err.name, err if show else None, Fore.RED if self._colored else None) with out_sync, err_sync: instance = self.build_instance(request, out_sync, err_sync) with instance as status: yield status exit_code = status.exit_code finally: end = time.monotonic() status.outcome = Outcome( request, show, exit_code, out_sync.text, err_sync.text, start, end, instance.cmd, status.metadata ) @abstractmethod def build_instance( self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite # noqa: U100 ) -> "ExecuteInstance": raise NotImplementedError class ExecuteInstance(ABC): """An instance of a command execution""" def __init__(self, request: ExecuteRequest, out: SyncWrite, err: SyncWrite) -> None: self.request = request self._out = out self._err = err @property def out_handler(self) -> ContentHandler: return self._out.handler @property def err_handler(self) -> ContentHandler: return self._err.handler @abstractmethod def __enter__(self) -> ExecuteStatus: raise NotImplementedError @abstractmethod def __exit__( self, exc_type: Optional[Type[BaseException]], # noqa: U100 exc_val: Optional[BaseException], # noqa: U100 exc_tb: Optional[TracebackType], # noqa: U100 ) -> None: raise NotImplementedError @property @abstractmethod def cmd(self) -> Sequence[str]: raise NotImplementedError class Outcome: """Result of a command execution""" OK = 0 def __init__( self, request: ExecuteRequest, show_on_standard: bool, exit_code: Optional[int], out: str, err: str, start: float, end: float, cmd: Sequence[str], metadata: Dict[str, Any], ): """ Create a new execution outcome. :param request: the execution request :param show_on_standard: a flag indicating if the execution was shown on stdout/stderr :param exit_code: the exit code for the execution :param out: the standard output of the execution :param err: the standard error of the execution :param start: a timer sample for the start of the execution :param end: a timer sample for the end of the execution :param cmd: the command as executed :param metadata: additional metadata attached to the execution """ self.request = request #: the execution request self.show_on_standard = show_on_standard #: a flag indicating if the execution was shown on stdout/stderr self.exit_code = exit_code #: the exit code for the execution self.out = out #: the standard output of the execution self.err = err #: the standard error of the execution self.start = start #: a timer sample for the start of the execution self.end = end #: a timer sample for the end of the execution self.cmd = cmd #: the command as executed self.metadata = metadata #: additional metadata attached to the execution def __bool__(self) -> bool: return self.exit_code == self.OK def __repr__(self) -> str: return ( f"{self.__class__.__name__}: exit {self.exit_code} in {self.elapsed:.2f} seconds" f" for {self.request.shell_cmd}" ) def assert_success(self) -> None: """Assert that the execution succeeded""" if self.exit_code is not None and self.exit_code != self.OK: self._assert_fail() self.log_run_done(logging.INFO) def _assert_fail(self) -> NoReturn: if self.show_on_standard is False: if self.out: sys.stdout.write(self.out) if not self.out.endswith("\n"): sys.stdout.write("\n") if self.err: sys.stderr.write(Fore.RED) sys.stderr.write(self.err) sys.stderr.write(Fore.RESET) if not self.err.endswith("\n"): sys.stderr.write("\n") self.log_run_done(logging.CRITICAL) raise SystemExit(self.exit_code) def log_run_done(self, lvl: int) -> None: """ Log that the run was done. :param lvl: the level on what to log as interpreted by :func:`logging.log` """ req = self.request metadata = "" if self.metadata: metadata = f" {', '.join(f'{k}={v}' for k, v in self.metadata.items())}" LOGGER.log( lvl, "exit %s (%.2f seconds) %s> %s%s", self.exit_code, self.elapsed, req.cwd, req.shell_cmd, metadata, ) @property def elapsed(self) -> float: """:return: time the execution took in seconds""" return self.end - self.start def out_err(self) -> Tuple[str, str]: """:return: a tuple of the standard output and standard error""" return self.out, self.err __all__ = ( "ContentHandler", "Outcome", "Execute", "ExecuteInstance", "ExecuteStatus", "StdinSource", )