From 884ab574593a04a4790e45aaf92f1c4fa6af4bb9 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Fri, 8 Nov 2019 12:18:43 +0000 Subject: scheduler.py: Prevent the asyncio loop from leaking into subprocesses Having a running asyncio loop while forking a program is not supported in python and doesn't work as expected. This leads to file descriptors leaking and the subprocesses sharing the same loop as the parents. This also leads to the parent receiving all signals the children receive. This ensures we don't leek our asyncio loop in the workers we fork. --- buildstream/_scheduler/_multiprocessing.py | 79 ++++++++++++++++++++++++++++++ buildstream/_scheduler/jobs/job.py | 12 +---- 2 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 buildstream/_scheduler/_multiprocessing.py diff --git a/buildstream/_scheduler/_multiprocessing.py b/buildstream/_scheduler/_multiprocessing.py new file mode 100644 index 000000000..4864e140c --- /dev/null +++ b/buildstream/_scheduler/_multiprocessing.py @@ -0,0 +1,79 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# + +# TLDR: +# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process` +# +# +# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running. +# +# The main problem that affects us is that the parent and the child will share some file handlers. +# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received +# by the app so that the asyncio loop can treat them afterwards. +# +# This sharing means that when we send a signal to the child, the sighandler in the child will write +# it back to the parent sig_handler_fd, making the parent have to treat it too. +# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children, +# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to +# the children... +# +# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically +# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers. +# +# +# Relevant issues: +# - Asyncio: support fork (https://bugs.python.org/issue21998) +# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087) +# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489) +# +# + +import multiprocessing +import signal +import sys +from asyncio import set_event_loop_policy + + +# _AsyncioSafeForkAwareProcess() +# +# Process class that doesn't call waitpid on its own. +# This prevents conflicts with the asyncio child watcher. +# +# Also automatically close any running asyncio loop before calling +# the actual run target +# +class _AsyncioSafeForkAwareProcess(multiprocessing.Process): + # pylint: disable=attribute-defined-outside-init + def start(self): + self._popen = self._Popen(self) + self._sentinel = self._popen.sentinel + + def run(self): + signal.set_wakeup_fd(-1) + set_event_loop_policy(None) + + super().run() + + +if sys.platform != "win32": + # Set the default event loop policy to automatically close our asyncio loop in child processes + AsyncioSafeProcess = _AsyncioSafeForkAwareProcess + +else: + # Windows doesn't support ChildWatcher that way anyways, we'll need another + # implementation if we want it + AsyncioSafeProcess = multiprocessing.Process diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 67ec75db5..adb520088 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -33,6 +33,7 @@ import multiprocessing from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils +from .. import _multiprocessing # Return code values shutdown of job handling child processes # @@ -65,15 +66,6 @@ class _Envelope(): self.message = message -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -class Process(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - # Job() # # The Job object represents a parallel task, when calling Job.spawn(), @@ -128,7 +120,7 @@ class Job(): self._parent_start_listening() # Spawn the process - self._process = Process(target=self._child_action, args=[self._queue]) + self._process = _multiprocessing.AsyncioSafeProcess(target=self._child_action, args=[self._queue]) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main -- cgit v1.2.1