diff options
author | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-07-03 12:57:06 +0000 |
---|---|---|
committer | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-12-04 10:36:37 +0000 |
commit | 705d0023f65621b23b6b0828306dc5b4ee094b45 (patch) | |
tree | c021c06427c7e37508c682bc99e06d9eae090975 /src/buildstream/_scheduler/scheduler.py | |
parent | be88eaec0445ff2d85b73c17a392d0e65620202b (diff) | |
download | buildstream-705d0023f65621b23b6b0828306dc5b4ee094b45.tar.gz |
scheduler.py: Use threads instead of processes for jobs
This changes how the scheduler works and adapts all the code that needs
adapting in order to be able to run in threads instead of in
subprocesses, which helps with Windows support, and will allow some
simplifications in the main pipeline.
This addresses the following issues:
* Fix #810: All CAS calls are now made in the master process, and thus
share the same connection to the cas server
* Fix #93: We don't start as many child processes anymore, so the risk
of starving the machine are way less
* Fix #911: We now use `forkserver` for starting processes. We also
don't use subprocesses for jobs so we should be starting less
subprocesses
And the following highlevel changes where made:
* cascache.py: Run the CasCacheUsageMonitor in a thread instead of a
subprocess.
* casdprocessmanager.py: Ensure start and stop of the process are thread
safe.
* job.py: Run the child in a thread instead of a process, adapt how we
stop a thread, since we ca't use signals anymore.
* _multiprocessing.py: Not needed anymore, we are not using `fork()`.
* scheduler.py: Run the scheduler with a threadpool, to run the child
jobs in. Also adapt how our signal handling is done, since we are not
receiving signals from our children anymore, and can't kill them the
same way.
* sandbox: Stop using blocking signals to wait on the process, and use
timeouts all the time.
* messenger.py: Use a thread-local context for the handler, to allow for
multiple parameters in the same process.
* _remote.py: Ensure the start of the connection is thread safe
* _signal.py: Allow blocking entering in the signal's context managers
by setting an event. This is to ensure no thread runs long-running
code while we asked the scheduler to pause. This also ensures all the
signal handlers is thread safe.
* source.py: Change check around saving the source's ref. We are now
running in the same process, and thus the ref will already have been
changed.
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 83 |
1 files changed, 50 insertions, 33 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 903cd0be9..b46314a9a 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -20,12 +20,15 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # System imports +import functools import os import asyncio from itertools import chain import signal import datetime +import multiprocessing.forkserver import sys +from concurrent.futures import ThreadPoolExecutor # Local imports from .resources import Resources @@ -34,9 +37,7 @@ from ..types import FastEnum from .._profile import Topics, PROFILER from .._message import Message, MessageType from ..plugin import Plugin - - -_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds +from .. import _signals # A decent return code for Scheduler.run() @@ -46,6 +47,23 @@ class SchedStatus(FastEnum): TERMINATED = 1 +def reset_signals_on_exit(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + orig_sigint = signal.getsignal(signal.SIGINT) + orig_sigterm = signal.getsignal(signal.SIGTERM) + orig_sigtstp = signal.getsignal(signal.SIGTSTP) + + try: + return func(*args, **kwargs) + finally: + signal.signal(signal.SIGINT, orig_sigint) + signal.signal(signal.SIGTERM, orig_sigterm) + signal.signal(signal.SIGTSTP, orig_sigtstp) + + return wrapper + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -79,7 +97,6 @@ class Scheduler: # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue - self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py # # Private members @@ -98,6 +115,14 @@ class Scheduler: self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) self._state.register_task_retry_callback(self._failure_retry) + # Ensure that the forkserver is started before we start. + # This is best run before we do any GRPC connections to casd or have + # other background threads started. + # We ignore signals here, as this is the state all the python child + # processes from now on will have when starting + with _signals.blocked([signal.SIGINT, signal.SIGTSTP], ignore=True): + multiprocessing.forkserver.ensure_running() + # run() # # Args: @@ -113,6 +138,7 @@ class Scheduler: # elements have been processed by each queue or when # an error arises # + @reset_signals_on_exit def run(self, queues, casd_process_manager): # Hold on to the queues to process @@ -149,10 +175,18 @@ class Scheduler: # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): - # Run the queues - self._sched() - self.loop.run_forever() - self.loop.close() + # This is not a no-op. Since it is the first signal registration + # that is set, it allows then other threads to register signal + # handling routines, which would not be possible if the main thread + # hadn't set it before. + # FIXME: this should be done in a cleaner way + with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None): + with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool: + self.loop.set_default_executor(pool) + # Run the queues + self._sched() + self.loop.run_forever() + self.loop.close() # Invoke the ticker callback a final time to render pending messages self._ticker_callback() @@ -351,13 +385,6 @@ class Scheduler: # If that happens, do another round. process_queues = any(q.dequeue_ready() for q in self.queues) - # Make sure fork is allowed before starting jobs - if not self.context.prepare_fork(): - message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") - self.context.messenger.message(message) - self.terminate() - return - # Start the jobs # for job in ready: @@ -415,9 +442,10 @@ class Scheduler: if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - # Notify that we're suspended - for job in self._active_jobs: - job.suspend() + _signals.is_not_suspended.clear() + + for suspender in reversed(_signals.suspendable_stack): + suspender.suspend() # _resume_jobs() # @@ -425,8 +453,10 @@ class Scheduler: # def _resume_jobs(self): if self.suspended: - for job in self._active_jobs: - job.resume() + for suspender in _signals.suspendable_stack: + suspender.resume() + + _signals.is_not_suspended.set() self.suspended = False # Notify that we're unsuspended self._state.offset_start_time(datetime.datetime.now() - self._suspendtime) @@ -458,12 +488,6 @@ class Scheduler: # A loop registered event callback for SIGTSTP # def _suspend_event(self): - - # Ignore the feedback signals from Job.suspend() - if self.internal_stops: - self.internal_stops -= 1 - return - # No need to care if jobs were suspended or not, we _only_ handle this # while we know jobs are not suspended. self._suspend_jobs() @@ -485,13 +509,6 @@ class Scheduler: self.loop.remove_signal_handler(signal.SIGTERM) def _terminate_jobs_real(self): - def kill_jobs(): - for job_ in self._active_jobs: - job_.kill() - - # Schedule all jobs to be killed if they have not exited after timeout - self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs) - for job in self._active_jobs: job.terminate() |