summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/jobs
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-07-03 12:57:06 +0000
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-04 10:36:37 +0000
commit705d0023f65621b23b6b0828306dc5b4ee094b45 (patch)
treec021c06427c7e37508c682bc99e06d9eae090975 /src/buildstream/_scheduler/jobs
parentbe88eaec0445ff2d85b73c17a392d0e65620202b (diff)
downloadbuildstream-705d0023f65621b23b6b0828306dc5b4ee094b45.tar.gz
scheduler.py: Use threads instead of processes for jobs
This changes how the scheduler works and adapts all the code that needs adapting in order to be able to run in threads instead of in subprocesses, which helps with Windows support, and will allow some simplifications in the main pipeline. This addresses the following issues: * Fix #810: All CAS calls are now made in the master process, and thus share the same connection to the cas server * Fix #93: We don't start as many child processes anymore, so the risk of starving the machine are way less * Fix #911: We now use `forkserver` for starting processes. We also don't use subprocesses for jobs so we should be starting less subprocesses And the following highlevel changes where made: * cascache.py: Run the CasCacheUsageMonitor in a thread instead of a subprocess. * casdprocessmanager.py: Ensure start and stop of the process are thread safe. * job.py: Run the child in a thread instead of a process, adapt how we stop a thread, since we ca't use signals anymore. * _multiprocessing.py: Not needed anymore, we are not using `fork()`. * scheduler.py: Run the scheduler with a threadpool, to run the child jobs in. Also adapt how our signal handling is done, since we are not receiving signals from our children anymore, and can't kill them the same way. * sandbox: Stop using blocking signals to wait on the process, and use timeouts all the time. * messenger.py: Use a thread-local context for the handler, to allow for multiple parameters in the same process. * _remote.py: Ensure the start of the connection is thread safe * _signal.py: Allow blocking entering in the signal's context managers by setting an event. This is to ensure no thread runs long-running code while we asked the scheduler to pause. This also ensures all the signal handlers is thread safe. * source.py: Change check around saving the source's ref. We are now running in the same process, and thus the ref will already have been changed.
Diffstat (limited to 'src/buildstream/_scheduler/jobs')
-rw-r--r--src/buildstream/_scheduler/jobs/_job.pyi1
-rw-r--r--src/buildstream/_scheduler/jobs/_job.pyx15
-rw-r--r--src/buildstream/_scheduler/jobs/job.py295
3 files changed, 133 insertions, 178 deletions
diff --git a/src/buildstream/_scheduler/jobs/_job.pyi b/src/buildstream/_scheduler/jobs/_job.pyi
new file mode 100644
index 000000000..fbf3e64de
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/_job.pyi
@@ -0,0 +1 @@
+def terminate_thread(thread_id: int): ...
diff --git a/src/buildstream/_scheduler/jobs/_job.pyx b/src/buildstream/_scheduler/jobs/_job.pyx
new file mode 100644
index 000000000..82f6ab044
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/_job.pyx
@@ -0,0 +1,15 @@
+from cpython.pystate cimport PyThreadState_SetAsyncExc
+from cpython.ref cimport PyObject
+from ..._signals import TerminateException
+
+
+# terminate_thread()
+#
+# Ask a given a given thread to terminate by raising an exception in it.
+#
+# Args:
+# thread_id (int): the thread id in which to throw the exception
+#
+def terminate_thread(long thread_id):
+ res = PyThreadState_SetAsyncExc(thread_id, <PyObject*> TerminateException)
+ assert res == 1
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 08e40694e..2e8f5ca1a 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -25,17 +25,16 @@ import asyncio
import datetime
import itertools
import multiprocessing
-import os
-import signal
-import sys
+import threading
import traceback
# BuildStream toplevel imports
+from ... import utils
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
-from ... import _signals, utils
-from .. import _multiprocessing
+from ._job import terminate_thread
+from ..._signals import TerminateException
# Return code values shutdown of job handling child processes
@@ -46,7 +45,6 @@ class _ReturnCode(FastEnum):
PERM_FAIL = 2
SKIPPED = 3
TERMINATED = 4
- KILLED = -9
# JobStatus:
@@ -131,7 +129,6 @@ class Job:
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
self._pipe_r = None # The read end of a pipe for message passing
- self._process = None # The Process object
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
@@ -144,6 +141,9 @@ class Job:
self._message_element_key = None # The task-wide element cache key
self._element = None # The Element() passed to the Job() constructor, if applicable
+ self._task = None # The task that is run
+ self._child = None
+
# set_name()
#
# Sets the name of this job
@@ -158,12 +158,14 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
+ # FIXME: remove this, this is not necessary when using asyncio
self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
self._tries += 1
self._parent_start_listening()
- child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
+ # FIXME: remove the parent/child separation, it's not needed anymore.
+ self._child = self.create_child_job( # pylint: disable=assignment-from-no-return
self.action_name,
self._messenger,
self._scheduler.context.logdir,
@@ -174,46 +176,28 @@ class Job:
self._message_element_key,
)
- self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],)
-
- # Block signals which are handled in the main process such that
- # the child process does not inherit the parent's state, but the main
- # process will be notified of any signal after we launch the child.
- #
- with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- with asyncio.get_child_watcher() as watcher:
- self._process.start()
- # Register the process to call `_parent_child_completed` once it is done
-
- # Close the write end of the pipe in the parent
- pipe_w.close()
+ loop = asyncio.get_event_loop()
- # Here we delay the call to the next loop tick. This is in order to be running
- # in the main thread, as the callback itself must be thread safe.
- def on_completion(pid, returncode):
- asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+ async def execute():
+ result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+ await self._parent_child_completed(result)
- watcher.add_child_handler(self._process.pid, on_completion)
+ self._task = loop.create_task(execute())
# terminate()
#
# Politely request that an ongoing job terminate soon.
#
- # This will send a SIGTERM signal to the Job process.
+ # This will raise an exception in the child to ask it to exit.
#
def terminate(self):
-
- # First resume the job if it's suspended
- self.resume(silent=True)
-
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
# Make sure there is no garbage on the pipe
self._parent_stop_listening()
- # Terminate the process using multiprocessing API pathway
- if self._process:
- self._process.terminate()
+ if self._task:
+ self._child.terminate()
self._terminated = True
@@ -227,51 +211,6 @@ class Job:
def get_terminated(self):
return self._terminated
- # kill()
- #
- # Forcefully kill the process, and any children it might have.
- #
- def kill(self):
- # Force kill
- self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name))
- if self._process:
- utils._kill_process_tree(self._process.pid)
-
- # suspend()
- #
- # Suspend this job.
- #
- def suspend(self):
- if not self._suspended:
- self.message(MessageType.STATUS, "{} suspending".format(self.action_name))
-
- try:
- # Use SIGTSTP so that child processes may handle and propagate
- # it to processes they start that become session leaders.
- os.kill(self._process.pid, signal.SIGTSTP)
-
- # For some reason we receive exactly one suspend event for
- # every SIGTSTP we send to the child process, even though the
- # child processes are setsid(). We keep a count of these so we
- # can ignore them in our event loop suspend_event().
- self._scheduler.internal_stops += 1
- self._suspended = True
- except ProcessLookupError:
- # ignore, process has already exited
- pass
-
- # resume()
- #
- # Resume this suspended job.
- #
- def resume(self, silent=False):
- if self._suspended:
- if not silent and not self._scheduler.terminated:
- self.message(MessageType.STATUS, "{} resuming".format(self.action_name))
-
- os.kill(self._process.pid, signal.SIGCONT)
- self._suspended = False
-
# set_message_element_name()
#
# This is called by Job subclasses to set the plugin instance element
@@ -380,10 +319,9 @@ class Job:
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
#
# Args:
- # pid (int): The PID of the child which completed
# returncode (int): The return code of the child process
#
- def _parent_child_completed(self, pid, returncode):
+ async def _parent_child_completed(self, returncode):
self._parent_shutdown()
try:
@@ -414,16 +352,9 @@ class Job:
status = JobStatus.FAIL
elif returncode == _ReturnCode.TERMINATED:
if self._terminated:
- self.message(MessageType.INFO, "Process was terminated")
- else:
- self.message(MessageType.ERROR, "Process was terminated unexpectedly")
-
- status = JobStatus.FAIL
- elif returncode == _ReturnCode.KILLED:
- if self._terminated:
- self.message(MessageType.INFO, "Process was killed")
+ self.message(MessageType.INFO, "Job terminated")
else:
- self.message(MessageType.ERROR, "Process was killed unexpectedly")
+ self.message(MessageType.ERROR, "Job was terminated unexpectedly")
status = JobStatus.FAIL
else:
@@ -434,7 +365,7 @@ class Job:
# Force the deletion of the pipe and process objects to try and clean up FDs
self._pipe_r.close()
- self._pipe_r = self._process = None
+ self._pipe_r = self._task = None
# _parent_process_envelope()
#
@@ -549,6 +480,9 @@ class ChildJob:
self._message_element_key = message_element_key
self._pipe_w = None # The write end of a pipe for message passing
+ self._thread_id = None # Thread in which the child executes its action
+ self._should_terminate = False
+ self._terminate_lock = threading.Lock()
# message():
#
@@ -615,19 +549,6 @@ class ChildJob:
# pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
#
def child_action(self, pipe_w):
-
- # This avoids some SIGTSTP signals from grandchildren
- # getting propagated up to the master process
- os.setsid()
-
- # First set back to the default signal handlers for the signals
- # we handle, and then clear their blocked state.
- #
- signal_list = [signal.SIGTSTP, signal.SIGTERM]
- for sig in signal_list:
- signal.signal(sig, signal.SIG_DFL)
- signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
-
# Assign the pipe we passed across the process boundaries
#
# Set the global message handler in this child
@@ -635,78 +556,108 @@ class ChildJob:
self._pipe_w = pipe_w
self._messenger.set_message_handler(self._child_message_handler)
- # Graciously handle sigterms.
- def handle_sigterm():
- self._child_shutdown(_ReturnCode.TERMINATED)
-
# Time, log and and run the action function
#
- with _signals.terminator(
- handle_sigterm
- ), self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
+ with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
self._logfile, self._logdir
) as filename:
- self.message(MessageType.START, self.action_name, logfile=filename)
-
try:
- # Try the task action
- result = self.child_process() # pylint: disable=assignment-from-no-return
- except SkipJob as e:
- elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
-
- # Alert parent of skip by return code
- self._child_shutdown(_ReturnCode.SKIPPED)
- except BstError as e:
- elapsed = datetime.datetime.now() - timeinfo.start_time
- retry_flag = e.temporary
-
- if retry_flag and (self._tries <= self._max_retries):
- self.message(
- MessageType.FAIL,
- "Try #{} failed, retrying".format(self._tries),
- elapsed=elapsed,
- logfile=filename,
- )
- else:
- self.message(
- MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox
- )
+ self.message(MessageType.START, self.action_name, logfile=filename)
+
+ with self._terminate_lock:
+ self._thread_id = threading.current_thread().ident
+ if self._should_terminate:
+ return _ReturnCode.TERMINATED
+
+ try:
+ # Try the task action
+ result = self.child_process() # pylint: disable=assignment-from-no-return
+ except SkipJob as e:
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
+
+ # Alert parent of skip by return code
+ return _ReturnCode.SKIPPED
+ except BstError as e:
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ retry_flag = e.temporary
+
+ if retry_flag and (self._tries <= self._max_retries):
+ self.message(
+ MessageType.FAIL,
+ "Try #{} failed, retrying".format(self._tries),
+ elapsed=elapsed,
+ logfile=filename,
+ )
+ else:
+ self.message(
+ MessageType.FAIL,
+ str(e),
+ elapsed=elapsed,
+ detail=e.detail,
+ logfile=filename,
+ sandbox=e.sandbox,
+ )
+
+ self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
+
+ # Report the exception to the parent (for internal testing purposes)
+ self._child_send_error(e)
+
+ # Set return code based on whether or not the error was temporary.
+ #
+ return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+ except Exception: # pylint: disable=broad-except
+
+ # If an unhandled (not normalized to BstError) occurs, that's a bug,
+ # send the traceback and formatted exception back to the frontend
+ # and print it to the log file.
+ #
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
+
+ self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
+ # Unhandled exceptions should permenantly fail
+ return _ReturnCode.PERM_FAIL
- self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
-
- # Report the exception to the parent (for internal testing purposes)
- self._child_send_error(e)
-
- # Set return code based on whether or not the error was temporary.
- #
- self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
-
- except Exception: # pylint: disable=broad-except
-
- # If an unhandled (not normalized to BstError) occurs, that's a bug,
- # send the traceback and formatted exception back to the frontend
- # and print it to the log file.
- #
- elapsed = datetime.datetime.now() - timeinfo.start_time
- detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
+ else:
+ # No exception occurred in the action
+ self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
+ self._child_send_result(result)
+
+ elapsed = datetime.datetime.now() - timeinfo.start_time
+ self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+
+ # Shutdown needs to stay outside of the above context manager,
+ # make sure we dont try to handle SIGTERM while the process
+ # is already busy in sys.exit()
+ return _ReturnCode.OK
+ finally:
+ self._thread_id = None
+ except TerminateException:
+ self._thread_id = None
+ return _ReturnCode.TERMINATED
+ finally:
+ self._pipe_w.close()
- self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
- # Unhandled exceptions should permenantly fail
- self._child_shutdown(_ReturnCode.PERM_FAIL)
+ # terminate()
+ #
+ # Ask the the current child thread to terminate
+ #
+ # This should only ever be called from the main thread.
+ #
+ def terminate(self):
+ assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler"
- else:
- # No exception occurred in the action
- self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
- self._child_send_result(result)
+ if self._should_terminate:
+ return
- elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+ with self._terminate_lock:
+ self._should_terminate = True
+ if self._thread_id is None:
+ return
- # Shutdown needs to stay outside of the above context manager,
- # make sure we dont try to handle SIGTERM while the process
- # is already busy in sys.exit()
- self._child_shutdown(_ReturnCode.OK)
+ terminate_thread(self._thread_id)
#######################################################
# Local Private Methods #
@@ -758,18 +709,6 @@ class ChildJob:
if result is not None:
self._send_message(_MessageType.RESULT, result)
- # _child_shutdown()
- #
- # Shuts down the child process by cleaning up and exiting the process
- #
- # Args:
- # exit_code (_ReturnCode): The exit code to exit with
- #
- def _child_shutdown(self, exit_code):
- self._pipe_w.close()
- assert isinstance(exit_code, _ReturnCode)
- sys.exit(exit_code.value)
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the