summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-10 07:55:03 +0000
committerBenjamin Schubert <contact@benschubert.me>2020-08-22 14:14:34 +0100
commit5b948b8d96028c7850ec2bdaa19aff3dde09e790 (patch)
tree9f12982adae90e540e0cf8c2e3742e5268a78956
parentd211abc07f5e56ad1f9e4a6a6b8804e67380e4fe (diff)
downloadbuildstream-bschubert/no-multiprocessing-full.tar.gz
-rw-r--r--src/buildstream/plugin.py79
1 files changed, 72 insertions, 7 deletions
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 0ed6d7d6f..467c955dd 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -112,13 +112,14 @@ Class Reference
import itertools
import multiprocessing
import os
+import signal
import subprocess
import sys
from contextlib import contextmanager
from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING
from weakref import WeakValueDictionary
-from . import utils
+from . import utils, _signals
from ._exceptions import PluginError, ImplError
from ._message import Message, MessageType
from .node import MappingNode, ProvenanceInformation
@@ -137,6 +138,18 @@ T2 = TypeVar("T2")
def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None:
+ # This avoids some SIGTSTP signals from grandchildren
+ # getting propagated up to the master process
+ os.setsid()
+
+ # First set back to the default signal handlers for the signals
+ # we handle, and then clear their blocked state.
+ #
+ signal_list = [signal.SIGTSTP, signal.SIGTERM]
+ for sig in signal_list:
+ signal.signal(sig, signal.SIG_DFL)
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
+
queue.put(target(*args))
@@ -520,15 +533,67 @@ class Plugin:
):
queue = self.__multiprocessing_context.Queue()
- proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
- proc.start()
-
- result = queue.get()
- proc.join()
+ process = None
+
+ from .utils import _kill_process_tree
+ import psutil
+
+ # Handle termination, suspend and resume
+ def kill_proc():
+ if not process:
+ return
+
+ proc = psutil.Process(process.pid)
+
+ # Some callers know that their subprocess can be
+ # gracefully terminated, make an attempt first
+ proc.terminate()
+
+ try:
+ proc.wait(5)
+ except psutil.TimeoutExpired:
+ # Did not terminate within the timeout: murder
+ _kill_process_tree(proc.pid)
+
+ else:
+ # FIXME: This is a brutal but reliable approach
+ #
+ # Other variations I've tried which try SIGTERM first
+ # and then wait for child processes to exit gracefully
+ # have not reliably cleaned up process trees and have
+ # left orphaned git or ssh processes alive.
+ #
+ # This cleans up the subprocesses reliably but may
+ # cause side effects such as possibly leaving stale
+ # locks behind. Hopefully this should not be an issue
+ # as long as any child processes only interact with
+ # the temp directories which we control and cleanup
+ # ourselves.
+ #
+ _kill_process_tree(proc.pid)
+
+ def suspend_proc():
+ if process:
+ group_id = os.getpgid(process.pid)
+ os.killpg(group_id, signal.SIGSTOP)
+
+ def resume_proc():
+ if process:
+ group_id = os.getpgid(process.pid)
+ os.killpg(group_id, signal.SIGCONT)
+
+ with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc):
+
+ process = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
+
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+ process.start()
+
+ result = queue.get()
+ process.join()
return result
-
def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
"""A wrapper for subprocess.call()