summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentin David <valentin.david@codethink.co.uk>2020-03-10 14:34:57 +0000
committerValentin David <valentin.david@codethink.co.uk>2020-03-10 14:34:57 +0000
commit9f7b8671b701381fea8ee61f789a5d6aa1759c78 (patch)
tree7c83653319787ad96f7cc1e2729190998933ddf3
parentc12c7f596f15842028a46fff0ad062b3b4e2988f (diff)
parent884ab574593a04a4790e45aaf92f1c4fa6af4bb9 (diff)
downloadbuildstream-9f7b8671b701381fea8ee61f789a5d6aa1759c78.tar.gz
Merge branch 'valentindavid/bst-1/python3.8-with-backports' into 'bst-1'
[BuildStream 1.4] Backport changes to enable Python 3.8 See merge request BuildStream/buildstream!1830
-rw-r--r--buildstream/_scheduler/_multiprocessing.py79
-rw-r--r--buildstream/_scheduler/jobs/job.py63
-rw-r--r--buildstream/_scheduler/scheduler.py24
-rw-r--r--requirements/cov-requirements.in3
-rw-r--r--requirements/cov-requirements.txt5
-rw-r--r--requirements/dev-requirements.txt2
-rw-r--r--tox.ini20
7 files changed, 124 insertions, 72 deletions
diff --git a/buildstream/_scheduler/_multiprocessing.py b/buildstream/_scheduler/_multiprocessing.py
new file mode 100644
index 000000000..4864e140c
--- /dev/null
+++ b/buildstream/_scheduler/_multiprocessing.py
@@ -0,0 +1,79 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+# TLDR:
+# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process`
+#
+#
+# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running.
+#
+# The main problem that affects us is that the parent and the child will share some file handlers.
+# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received
+# by the app so that the asyncio loop can treat them afterwards.
+#
+# This sharing means that when we send a signal to the child, the sighandler in the child will write
+# it back to the parent sig_handler_fd, making the parent have to treat it too.
+# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children,
+# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to
+# the children...
+#
+# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically
+# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers.
+#
+#
+# Relevant issues:
+# - Asyncio: support fork (https://bugs.python.org/issue21998)
+# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087)
+# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489)
+#
+#
+
+import multiprocessing
+import signal
+import sys
+from asyncio import set_event_loop_policy
+
+
+# _AsyncioSafeForkAwareProcess()
+#
+# Process class that doesn't call waitpid on its own.
+# This prevents conflicts with the asyncio child watcher.
+#
+# Also automatically close any running asyncio loop before calling
+# the actual run target
+#
+class _AsyncioSafeForkAwareProcess(multiprocessing.Process):
+ # pylint: disable=attribute-defined-outside-init
+ def start(self):
+ self._popen = self._Popen(self)
+ self._sentinel = self._popen.sentinel
+
+ def run(self):
+ signal.set_wakeup_fd(-1)
+ set_event_loop_policy(None)
+
+ super().run()
+
+
+if sys.platform != "win32":
+ # Set the default event loop policy to automatically close our asyncio loop in child processes
+ AsyncioSafeProcess = _AsyncioSafeForkAwareProcess
+
+else:
+ # Windows doesn't support ChildWatcher that way anyways, we'll need another
+ # implementation if we want it
+ AsyncioSafeProcess = multiprocessing.Process
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index b8b4a2c76..adb520088 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -1,5 +1,6 @@
#
# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -32,6 +33,7 @@ import multiprocessing
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
+from .. import _multiprocessing
# Return code values shutdown of job handling child processes
#
@@ -64,15 +66,6 @@ class _Envelope():
self.message = message
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-class Process(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
-
# Job()
#
# The Job object represents a parallel task, when calling Job.spawn(),
@@ -127,39 +120,23 @@ class Job():
self._parent_start_listening()
# Spawn the process
- self._process = Process(target=self._child_action, args=[self._queue])
+ self._process = _multiprocessing.AsyncioSafeProcess(target=self._child_action, args=[self._queue])
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
# process will be notified of any signal after we launch the child.
#
with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- self._process.start()
+ with asyncio.get_child_watcher() as watcher:
+ self._process.start()
+ # Register the process to call `_parent_child_completed` once it is done
- # Wait for the child task to complete.
- #
- # This is a tricky part of python which doesnt seem to
- # make it to the online docs:
- #
- # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance
- # which is the default type of watcher, and the instance belongs to the
- # "event loop policy" in use (so there is only one in the main process).
- #
- # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio
- # loop, and will selectively reap any child pids which have been
- # terminated.
- #
- # o At registration time, the process will immediately be checked with
- # `os.waitpid()` and will be reaped immediately, before add_child_handler()
- # returns.
- #
- # The self._parent_child_completed callback passed here will normally
- # be called after the child task has been reaped with `os.waitpid()`, in
- # an event loop callback. Otherwise, if the job completes too fast, then
- # the callback is called immediately.
- #
- self._watcher = asyncio.get_child_watcher()
- self._watcher.add_child_handler(self._process.pid, self._parent_child_completed)
+ # Here we delay the call to the next loop tick. This is in order to be running
+ # in the main thread, as the callback itself must be thread safe.
+ def on_completion(pid, returncode):
+ asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+
+ watcher.add_child_handler(self._process.pid, on_completion)
# terminate()
#
@@ -182,21 +159,15 @@ class Job():
self._terminated = True
- # terminate_wait()
+ # get_terminated()
#
- # Wait for terminated jobs to complete
- #
- # Args:
- # timeout (float): Seconds to wait
+ # Check if a job has been terminated.
#
# Returns:
- # (bool): True if the process terminated cleanly, otherwise False
+ # (bool): True in the main process if Job.terminate() was called.
#
- def terminate_wait(self, timeout):
-
- # Join the child process after sending SIGTERM
- self._process.join(timeout)
- return self._process.exitcode is not None
+ def get_terminated(self):
+ return self._terminated
# kill()
#
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 68c115c1b..131cbb1d5 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -137,6 +137,12 @@ class Scheduler():
# Hold on to the queues to process
self.queues = queues
+ # NOTE: Enforce use of `SafeChildWatcher` as we generally don't want
+ # background threads.
+ # In Python 3.8+, `ThreadedChildWatcher` is the default watcher, and
+ # not `SafeChildWatcher`.
+ asyncio.set_child_watcher(asyncio.SafeChildWatcher())
+
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
self.loop = asyncio.new_event_loop()
@@ -516,21 +522,15 @@ class Scheduler():
self.loop.remove_signal_handler(signal.SIGTERM)
def _terminate_jobs_real(self):
- # 20 seconds is a long time, it can take a while and sometimes
- # we still fail, need to look deeper into this again.
- wait_start = datetime.datetime.now()
- wait_limit = 20.0
+ def kill_jobs():
+ for job_ in self._active_jobs:
+ job_.kill()
- # First tell all jobs to terminate
- for job in self._active_jobs:
- job.terminate()
+ # Schedule all jobs to be killed if they have not exited in 20 sec
+ self.loop.call_later(20, kill_jobs)
- # Now wait for them to really terminate
for job in self._active_jobs:
- elapsed = datetime.datetime.now() - wait_start
- timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
- if not job.terminate_wait(timeout):
- job.kill()
+ job.terminate()
# Regular timeout for driving status in the UI
def _tick(self):
diff --git a/requirements/cov-requirements.in b/requirements/cov-requirements.in
index 455b91ba6..1911f3506 100644
--- a/requirements/cov-requirements.in
+++ b/requirements/cov-requirements.in
@@ -1,2 +1,3 @@
-coverage == 4.4.0
+coverage == 4.4.0 ; python_version < '3.8'
+coverage == 4.5.4 ; python_version >= '3.8'
pytest-cov >= 2.5.0
diff --git a/requirements/cov-requirements.txt b/requirements/cov-requirements.txt
index 46c70432d..51bc2bd28 100644
--- a/requirements/cov-requirements.txt
+++ b/requirements/cov-requirements.txt
@@ -1,4 +1,5 @@
-coverage==4.4
+coverage==4.4.0; python_version < '3.8'
+coverage==4.5.4; python_version >= '3.8'
pytest-cov==2.7.1
## The following requirements were added by pip freeze:
atomicwrites==1.3.0
@@ -6,7 +7,7 @@ attrs==19.1.0
importlib-metadata==0.20
more-itertools==7.2.0
packaging==19.1
-pluggy==0.12.0
+pluggy==0.13.1
py==1.8.0
pyparsing==2.4.2
pytest==5.1.2
diff --git a/requirements/dev-requirements.txt b/requirements/dev-requirements.txt
index e6f327284..dfc991e9b 100644
--- a/requirements/dev-requirements.txt
+++ b/requirements/dev-requirements.txt
@@ -19,7 +19,7 @@ lazy-object-proxy==1.4.2
mccabe==0.6.1
more-itertools==7.2.0
packaging==19.1
-pluggy==0.12.0
+pluggy==0.13.1
py==1.8.0
pyparsing==2.4.2
pytest-cache==1.0
diff --git a/tox.ini b/tox.ini
index 42b85a108..903ae9af8 100644
--- a/tox.ini
+++ b/tox.ini
@@ -2,7 +2,7 @@
# Tox global configuration
#
[tox]
-envlist = py35,py36,py37
+envlist = py35,py36,py37,py38
skip_missing_interpreters = true
#
@@ -13,16 +13,16 @@ skip_missing_interpreters = true
[testenv]
commands =
# Running with coverage reporting enabled
- py{35,36,37}-!nocover: pytest --basetemp {envtmpdir} --cov=buildstream --cov-config .coveragerc {posargs}
- py{35,36,37}-!nocover: mkdir -p .coverage-reports
- py{35,36,37}-!nocover: mv {envtmpdir}/.coverage {toxinidir}/.coverage-reports/.coverage.{env:COVERAGE_PREFIX:}{envname}
+ py{35,36,37,38}-!nocover: pytest --basetemp {envtmpdir} --cov=buildstream --cov-config .coveragerc {posargs}
+ py{35,36,37,38}-!nocover: mkdir -p .coverage-reports
+ py{35,36,37,38}-!nocover: mv {envtmpdir}/.coverage {toxinidir}/.coverage-reports/.coverage.{env:COVERAGE_PREFIX:}{envname}
# Running with coverage reporting disabled
- py{35,36,37}-nocover: pytest --basetemp {envtmpdir} {posargs}
+ py{35,36,37,38}-nocover: pytest --basetemp {envtmpdir} {posargs}
deps =
- py{35,36,37}: -rrequirements/requirements.txt
- py{35,36,37}: -rrequirements/dev-requirements.txt
- py{35,36,37}: -rrequirements/plugin-requirements.txt
+ py{35,36,37,38}: -rrequirements/requirements.txt
+ py{35,36,37,38}: -rrequirements/dev-requirements.txt
+ py{35,36,37,38}: -rrequirements/plugin-requirements.txt
# Only require coverage and pytest-cov when using it
!nocover: -rrequirements/cov-requirements.txt
@@ -35,9 +35,9 @@ passenv =
# These keys are not inherited by any other sections
#
setenv =
- py{35,36,37}: COVERAGE_FILE = {envtmpdir}/.coverage
+ py{35,36,37,38}: COVERAGE_FILE = {envtmpdir}/.coverage
whitelist_externals =
- py{35,36,37}:
+ py{35,36,37,38}:
mv
mkdir