summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2019-11-08 11:17:21 +0000
committerBenjamin Schubert <contact@benschubert.me>2019-11-08 11:17:21 +0000
commitd94df88c2d132b0ae62aec33ff69594bbc2e2bb2 (patch)
treebdf2e9a679997e05432b8ef257af7bfdf4ac5ffd
parenta283f02ce9f5d72f03c783d516d69d8f57069ef4 (diff)
downloadbuildstream-bschubert/fix-children.tar.gz
WIP: Blanket updatebschubert/fix-children
-rw-r--r--src/buildstream/_frontend/app.py2
-rw-r--r--src/buildstream/_scheduler/_asyncio.py73
-rw-r--r--src/buildstream/_scheduler/jobs/job.py43
-rw-r--r--src/buildstream/_scheduler/scheduler.py25
4 files changed, 100 insertions, 43 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 45160afbc..b234bc87c 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -532,7 +532,7 @@ class App():
choice = click.prompt("Choice:",
value_proc=_prefix_choice_value_proc(['continue', 'quit', 'terminate']),
default='continue', err=True)
- except click.Abort:
+ except (click.Abort, SystemError):
# Ensure a newline after automatically printed '^C'
click.echo("", err=True)
choice = 'terminate'
diff --git a/src/buildstream/_scheduler/_asyncio.py b/src/buildstream/_scheduler/_asyncio.py
new file mode 100644
index 000000000..b9da163ab
--- /dev/null
+++ b/src/buildstream/_scheduler/_asyncio.py
@@ -0,0 +1,73 @@
+#
+# Copyright (C) 2019 Bloomberg LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import multiprocessing
+from asyncio import *
+
+
+class _ForkableUnixSelectorEventLoop(SelectorEventLoop):
+
+ def _cleanup_on_fork(self):
+ parent_selector = self._selector
+ self._selector = type(parent_selector)()
+
+ for key in parent_selector.get_map().values():
+ self._selector.register(key.fileobj, key.events, key.data)
+ parent_selector.close()
+ self._close_self_pipe()
+ self._make_self_pipe()
+
+
+class _DefaultEventLoopPolicy(DefaultEventLoopPolicy):
+ _loop_factory = _ForkableUnixSelectorEventLoop
+
+ def cleanup_on_fork(self):
+ if self._local._loop is None:
+ # We don't have a loop, we're fine
+ return
+
+ assert not Task.all_tasks(self._local._loop), "Some asyncio tasks leaked to a child"
+ assert Task.current_task(self._local._loop) is None, "We are in the child, we should have no running task"
+ self._local._loop._cleanup_on_fork()
+ self._local._loop.stop()
+ del self._local._loop
+
+
+class _AsyncioSafeProcess(multiprocessing.Process):
+ # pylint: disable=attribute-defined-outside-init
+ def start(self):
+ self._popen = self._Popen(self)
+ self._sentinel = self._popen.sentinel
+
+
+# Process class that doesn't call waitpid on its own.
+# This prevents conflicts with the asyncio child watcher.
+class _AsyncioForkAwareProcess(_AsyncioSafeProcess):
+ def run(self):
+ get_event_loop_policy().cleanup_on_fork()
+ super().run()
+
+
+if multiprocessing.get_start_method(False) == "fork":
+ # Set the default event loop policy to automatically close our asyncio loop in child processes
+ DefaultEventLoopPolicy = _DefaultEventLoopPolicy
+ set_event_loop_policy(DefaultEventLoopPolicy())
+
+ AsyncioSafeProcess = _AsyncioForkAwareProcess
+
+else:
+ AsyncioSafeProcess = _AsyncioSafeProcess
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 1d7697b02..e53552c43 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -33,6 +33,7 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
from ... import _signals, utils
+from buildstream._scheduler import _asyncio
from .jobpickler import pickle_child_job, do_pickled_child_job
@@ -44,6 +45,7 @@ class _ReturnCode(FastEnum):
FAIL = 1
PERM_FAIL = 2
SKIPPED = 3
+ TERMINATED = 4
# JobStatus:
@@ -69,15 +71,6 @@ class _Envelope():
self.message = message
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-class Process(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
-
class _MessageType(FastEnum):
LOG_MESSAGE = 1
ERROR = 2
@@ -184,12 +177,12 @@ class Job():
child_job,
self._scheduler.context.get_projects(),
)
- self._process = Process(
+ self._process = _asyncio.AsyncioSafeProcess(
target=do_pickled_child_job,
args=[pickled, self._queue],
)
else:
- self._process = Process(
+ self._process = _asyncio.AsyncioSafeProcess(
target=child_job.child_action,
args=[self._queue],
)
@@ -257,22 +250,6 @@ class Job():
def get_terminated(self):
return self._terminated
- # terminate_wait()
- #
- # Wait for terminated jobs to complete
- #
- # Args:
- # timeout (float): Seconds to wait
- #
- # Returns:
- # (bool): True if the process terminated cleanly, otherwise False
- #
- def terminate_wait(self, timeout):
-
- # Join the child process after sending SIGTERM
- self._process.join(timeout)
- return self._process.exitcode is not None
-
# kill()
#
# Forcefully kill the process, and any children it might have.
@@ -479,6 +456,10 @@ class Job():
status = JobStatus.SKIPPED
elif returncode in (_ReturnCode.FAIL, _ReturnCode.PERM_FAIL):
status = JobStatus.FAIL
+ # TODO IMPLEMENT
+ # elif returncode == _ReturnCode.TERMINATED:
+ # status = JobStatus.FAIL
+ # TODO: handle SIGKILL TOO
else:
status = JobStatus.FAIL
@@ -740,6 +721,14 @@ class ChildJob():
self.message(MessageType.START, self.action_name, logfile=filename)
+ # Graciously handle sigterms.
+ # This is required when we get killed by a 'terminate' event
+ # by our parent
+ def handle_sigterm(_signum, _sigframe):
+ self._child_shutdown(_ReturnCode.TERMINATED)
+
+ signal.signal(signal.SIGTERM, handle_sigterm)
+
try:
# Try the task action
result = self.child_process() # pylint: disable=assignment-from-no-return
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a8e..88ab8fc78 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -20,12 +20,12 @@
# System imports
import os
-import asyncio
from itertools import chain
import signal
import datetime
# Local imports
+from . import _asyncio
from .resources import Resources
from .jobs import JobStatus
from ..types import FastEnum
@@ -170,8 +170,8 @@ class Scheduler():
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
- self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(self.loop)
+ self.loop = _asyncio.new_event_loop()
+ _asyncio.set_event_loop(self.loop)
# Notify that the loop has been created
self._notify(Notification(NotificationType.RUNNING))
@@ -184,7 +184,7 @@ class Scheduler():
# Watch casd while running to ensure it doesn't die
self._casd_process = casd_process
- _watcher = asyncio.get_child_watcher()
+ _watcher = _asyncio.get_child_watcher()
_watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
# Start the profiler
@@ -528,21 +528,16 @@ class Scheduler():
self.loop.remove_signal_handler(signal.SIGTERM)
def _terminate_jobs_real(self):
- # 20 seconds is a long time, it can take a while and sometimes
- # we still fail, need to look deeper into this again.
- wait_start = datetime.datetime.now()
- wait_limit = 20.0
-
# First tell all jobs to terminate
for job in self._active_jobs:
job.terminate()
- # Now wait for them to really terminate
- for job in self._active_jobs:
- elapsed = datetime.datetime.now() - wait_start
- timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
- if not job.terminate_wait(timeout):
- job.kill()
+ # Callback to kill the jobs if they are still running after 20 sec
+ def kill_jobs():
+ for job_ in self._active_jobs:
+ job_.kill()
+
+ self.loop.call_later(20, kill_jobs)
# Regular timeout for driving status in the UI
def _tick(self):