diff options
author | Benjamin Schubert <contact@benschubert.me> | 2019-11-08 11:17:21 +0000 |
---|---|---|
committer | Benjamin Schubert <contact@benschubert.me> | 2019-11-08 11:17:21 +0000 |
commit | d94df88c2d132b0ae62aec33ff69594bbc2e2bb2 (patch) | |
tree | bdf2e9a679997e05432b8ef257af7bfdf4ac5ffd | |
parent | a283f02ce9f5d72f03c783d516d69d8f57069ef4 (diff) | |
download | buildstream-bschubert/fix-children.tar.gz |
WIP: Blanket updatebschubert/fix-children
-rw-r--r-- | src/buildstream/_frontend/app.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/_asyncio.py | 73 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 43 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 25 |
4 files changed, 100 insertions, 43 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 45160afbc..b234bc87c 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -532,7 +532,7 @@ class App(): choice = click.prompt("Choice:", value_proc=_prefix_choice_value_proc(['continue', 'quit', 'terminate']), default='continue', err=True) - except click.Abort: + except (click.Abort, SystemError): # Ensure a newline after automatically printed '^C' click.echo("", err=True) choice = 'terminate' diff --git a/src/buildstream/_scheduler/_asyncio.py b/src/buildstream/_scheduler/_asyncio.py new file mode 100644 index 000000000..b9da163ab --- /dev/null +++ b/src/buildstream/_scheduler/_asyncio.py @@ -0,0 +1,73 @@ +# +# Copyright (C) 2019 Bloomberg LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + +import multiprocessing +from asyncio import * + + +class _ForkableUnixSelectorEventLoop(SelectorEventLoop): + + def _cleanup_on_fork(self): + parent_selector = self._selector + self._selector = type(parent_selector)() + + for key in parent_selector.get_map().values(): + self._selector.register(key.fileobj, key.events, key.data) + parent_selector.close() + self._close_self_pipe() + self._make_self_pipe() + + +class _DefaultEventLoopPolicy(DefaultEventLoopPolicy): + _loop_factory = _ForkableUnixSelectorEventLoop + + def cleanup_on_fork(self): + if self._local._loop is None: + # We don't have a loop, we're fine + return + + assert not Task.all_tasks(self._local._loop), "Some asyncio tasks leaked to a child" + assert Task.current_task(self._local._loop) is None, "We are in the child, we should have no running task" + self._local._loop._cleanup_on_fork() + self._local._loop.stop() + del self._local._loop + + +class _AsyncioSafeProcess(multiprocessing.Process): + # pylint: disable=attribute-defined-outside-init + def start(self): + self._popen = self._Popen(self) + self._sentinel = self._popen.sentinel + + +# Process class that doesn't call waitpid on its own. +# This prevents conflicts with the asyncio child watcher. +class _AsyncioForkAwareProcess(_AsyncioSafeProcess): + def run(self): + get_event_loop_policy().cleanup_on_fork() + super().run() + + +if multiprocessing.get_start_method(False) == "fork": + # Set the default event loop policy to automatically close our asyncio loop in child processes + DefaultEventLoopPolicy = _DefaultEventLoopPolicy + set_event_loop_policy(DefaultEventLoopPolicy()) + + AsyncioSafeProcess = _AsyncioForkAwareProcess + +else: + AsyncioSafeProcess = _AsyncioSafeProcess diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 1d7697b02..e53552c43 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -33,6 +33,7 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum from ... import _signals, utils +from buildstream._scheduler import _asyncio from .jobpickler import pickle_child_job, do_pickled_child_job @@ -44,6 +45,7 @@ class _ReturnCode(FastEnum): FAIL = 1 PERM_FAIL = 2 SKIPPED = 3 + TERMINATED = 4 # JobStatus: @@ -69,15 +71,6 @@ class _Envelope(): self.message = message -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -class Process(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - class _MessageType(FastEnum): LOG_MESSAGE = 1 ERROR = 2 @@ -184,12 +177,12 @@ class Job(): child_job, self._scheduler.context.get_projects(), ) - self._process = Process( + self._process = _asyncio.AsyncioSafeProcess( target=do_pickled_child_job, args=[pickled, self._queue], ) else: - self._process = Process( + self._process = _asyncio.AsyncioSafeProcess( target=child_job.child_action, args=[self._queue], ) @@ -257,22 +250,6 @@ class Job(): def get_terminated(self): return self._terminated - # terminate_wait() - # - # Wait for terminated jobs to complete - # - # Args: - # timeout (float): Seconds to wait - # - # Returns: - # (bool): True if the process terminated cleanly, otherwise False - # - def terminate_wait(self, timeout): - - # Join the child process after sending SIGTERM - self._process.join(timeout) - return self._process.exitcode is not None - # kill() # # Forcefully kill the process, and any children it might have. @@ -479,6 +456,10 @@ class Job(): status = JobStatus.SKIPPED elif returncode in (_ReturnCode.FAIL, _ReturnCode.PERM_FAIL): status = JobStatus.FAIL + # TODO IMPLEMENT + # elif returncode == _ReturnCode.TERMINATED: + # status = JobStatus.FAIL + # TODO: handle SIGKILL TOO else: status = JobStatus.FAIL @@ -740,6 +721,14 @@ class ChildJob(): self.message(MessageType.START, self.action_name, logfile=filename) + # Graciously handle sigterms. + # This is required when we get killed by a 'terminate' event + # by our parent + def handle_sigterm(_signum, _sigframe): + self._child_shutdown(_ReturnCode.TERMINATED) + + signal.signal(signal.SIGTERM, handle_sigterm) + try: # Try the task action result = self.child_process() # pylint: disable=assignment-from-no-return diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index d3faa2a8e..88ab8fc78 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -20,12 +20,12 @@ # System imports import os -import asyncio from itertools import chain import signal import datetime # Local imports +from . import _asyncio from .resources import Resources from .jobs import JobStatus from ..types import FastEnum @@ -170,8 +170,8 @@ class Scheduler(): # Ensure that we have a fresh new event loop, in case we want # to run another test in this thread. - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) + self.loop = _asyncio.new_event_loop() + _asyncio.set_event_loop(self.loop) # Notify that the loop has been created self._notify(Notification(NotificationType.RUNNING)) @@ -184,7 +184,7 @@ class Scheduler(): # Watch casd while running to ensure it doesn't die self._casd_process = casd_process - _watcher = asyncio.get_child_watcher() + _watcher = _asyncio.get_child_watcher() _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure) # Start the profiler @@ -528,21 +528,16 @@ class Scheduler(): self.loop.remove_signal_handler(signal.SIGTERM) def _terminate_jobs_real(self): - # 20 seconds is a long time, it can take a while and sometimes - # we still fail, need to look deeper into this again. - wait_start = datetime.datetime.now() - wait_limit = 20.0 - # First tell all jobs to terminate for job in self._active_jobs: job.terminate() - # Now wait for them to really terminate - for job in self._active_jobs: - elapsed = datetime.datetime.now() - wait_start - timeout = max(wait_limit - elapsed.total_seconds(), 0.0) - if not job.terminate_wait(timeout): - job.kill() + # Callback to kill the jobs if they are still running after 20 sec + def kill_jobs(): + for job_ in self._active_jobs: + job_.kill() + + self.loop.call_later(20, kill_jobs) # Regular timeout for driving status in the UI def _tick(self): |