diff options
author | Benjamin Schubert <contact@benschubert.me> | 2020-07-10 07:55:03 +0000 |
---|---|---|
committer | Benjamin Schubert <contact@benschubert.me> | 2020-08-22 14:14:34 +0100 |
commit | 5b948b8d96028c7850ec2bdaa19aff3dde09e790 (patch) | |
tree | 9f12982adae90e540e0cf8c2e3742e5268a78956 | |
parent | d211abc07f5e56ad1f9e4a6a6b8804e67380e4fe (diff) | |
download | buildstream-bschubert/no-multiprocessing-full.tar.gz |
-rw-r--r-- | src/buildstream/plugin.py | 79 |
1 files changed, 72 insertions, 7 deletions
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index 0ed6d7d6f..467c955dd 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -112,13 +112,14 @@ Class Reference import itertools import multiprocessing import os +import signal import subprocess import sys from contextlib import contextmanager from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING from weakref import WeakValueDictionary -from . import utils +from . import utils, _signals from ._exceptions import PluginError, ImplError from ._message import Message, MessageType from .node import MappingNode, ProvenanceInformation @@ -137,6 +138,18 @@ T2 = TypeVar("T2") def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None: + # This avoids some SIGTSTP signals from grandchildren + # getting propagated up to the master process + os.setsid() + + # First set back to the default signal handlers for the signals + # we handle, and then clear their blocked state. + # + signal_list = [signal.SIGTSTP, signal.SIGTERM] + for sig in signal_list: + signal.signal(sig, signal.SIG_DFL) + signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) + queue.put(target(*args)) @@ -520,15 +533,67 @@ class Plugin: ): queue = self.__multiprocessing_context.Queue() - proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args)) - proc.start() - - result = queue.get() - proc.join() + process = None + + from .utils import _kill_process_tree + import psutil + + # Handle termination, suspend and resume + def kill_proc(): + if not process: + return + + proc = psutil.Process(process.pid) + + # Some callers know that their subprocess can be + # gracefully terminated, make an attempt first + proc.terminate() + + try: + proc.wait(5) + except psutil.TimeoutExpired: + # Did not terminate within the timeout: murder + _kill_process_tree(proc.pid) + + else: + # FIXME: This is a brutal but reliable approach + # + # Other variations I've tried which try SIGTERM first + # and then wait for child processes to exit gracefully + # have not reliably cleaned up process trees and have + # left orphaned git or ssh processes alive. + # + # This cleans up the subprocesses reliably but may + # cause side effects such as possibly leaving stale + # locks behind. Hopefully this should not be an issue + # as long as any child processes only interact with + # the temp directories which we control and cleanup + # ourselves. + # + _kill_process_tree(proc.pid) + + def suspend_proc(): + if process: + group_id = os.getpgid(process.pid) + os.killpg(group_id, signal.SIGSTOP) + + def resume_proc(): + if process: + group_id = os.getpgid(process.pid) + os.killpg(group_id, signal.SIGCONT) + + with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc): + + process = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args)) + + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): + process.start() + + result = queue.get() + process.join() return result - def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int: """A wrapper for subprocess.call() |