summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/scheduler.py
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-07-03 12:57:06 +0000
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-04 10:36:37 +0000
commit705d0023f65621b23b6b0828306dc5b4ee094b45 (patch)
treec021c06427c7e37508c682bc99e06d9eae090975 /src/buildstream/_scheduler/scheduler.py
parentbe88eaec0445ff2d85b73c17a392d0e65620202b (diff)
downloadbuildstream-705d0023f65621b23b6b0828306dc5b4ee094b45.tar.gz
scheduler.py: Use threads instead of processes for jobs
This changes how the scheduler works and adapts all the code that needs adapting in order to be able to run in threads instead of in subprocesses, which helps with Windows support, and will allow some simplifications in the main pipeline. This addresses the following issues: * Fix #810: All CAS calls are now made in the master process, and thus share the same connection to the cas server * Fix #93: We don't start as many child processes anymore, so the risk of starving the machine are way less * Fix #911: We now use `forkserver` for starting processes. We also don't use subprocesses for jobs so we should be starting less subprocesses And the following highlevel changes where made: * cascache.py: Run the CasCacheUsageMonitor in a thread instead of a subprocess. * casdprocessmanager.py: Ensure start and stop of the process are thread safe. * job.py: Run the child in a thread instead of a process, adapt how we stop a thread, since we ca't use signals anymore. * _multiprocessing.py: Not needed anymore, we are not using `fork()`. * scheduler.py: Run the scheduler with a threadpool, to run the child jobs in. Also adapt how our signal handling is done, since we are not receiving signals from our children anymore, and can't kill them the same way. * sandbox: Stop using blocking signals to wait on the process, and use timeouts all the time. * messenger.py: Use a thread-local context for the handler, to allow for multiple parameters in the same process. * _remote.py: Ensure the start of the connection is thread safe * _signal.py: Allow blocking entering in the signal's context managers by setting an event. This is to ensure no thread runs long-running code while we asked the scheduler to pause. This also ensures all the signal handlers is thread safe. * source.py: Change check around saving the source's ref. We are now running in the same process, and thus the ref will already have been changed.
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r--src/buildstream/_scheduler/scheduler.py83
1 files changed, 50 insertions, 33 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 903cd0be9..b46314a9a 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -20,12 +20,15 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# System imports
+import functools
import os
import asyncio
from itertools import chain
import signal
import datetime
+import multiprocessing.forkserver
import sys
+from concurrent.futures import ThreadPoolExecutor
# Local imports
from .resources import Resources
@@ -34,9 +37,7 @@ from ..types import FastEnum
from .._profile import Topics, PROFILER
from .._message import Message, MessageType
from ..plugin import Plugin
-
-
-_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds
+from .. import _signals
# A decent return code for Scheduler.run()
@@ -46,6 +47,23 @@ class SchedStatus(FastEnum):
TERMINATED = 1
+def reset_signals_on_exit(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ orig_sigint = signal.getsignal(signal.SIGINT)
+ orig_sigterm = signal.getsignal(signal.SIGTERM)
+ orig_sigtstp = signal.getsignal(signal.SIGTSTP)
+
+ try:
+ return func(*args, **kwargs)
+ finally:
+ signal.signal(signal.SIGINT, orig_sigint)
+ signal.signal(signal.SIGTERM, orig_sigterm)
+ signal.signal(signal.SIGTSTP, orig_sigtstp)
+
+ return wrapper
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -79,7 +97,6 @@ class Scheduler:
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
- self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
#
# Private members
@@ -98,6 +115,14 @@ class Scheduler:
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
self._state.register_task_retry_callback(self._failure_retry)
+ # Ensure that the forkserver is started before we start.
+ # This is best run before we do any GRPC connections to casd or have
+ # other background threads started.
+ # We ignore signals here, as this is the state all the python child
+ # processes from now on will have when starting
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP], ignore=True):
+ multiprocessing.forkserver.ensure_running()
+
# run()
#
# Args:
@@ -113,6 +138,7 @@ class Scheduler:
# elements have been processed by each queue or when
# an error arises
#
+ @reset_signals_on_exit
def run(self, queues, casd_process_manager):
# Hold on to the queues to process
@@ -149,10 +175,18 @@ class Scheduler:
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
- # Run the queues
- self._sched()
- self.loop.run_forever()
- self.loop.close()
+ # This is not a no-op. Since it is the first signal registration
+ # that is set, it allows then other threads to register signal
+ # handling routines, which would not be possible if the main thread
+ # hadn't set it before.
+ # FIXME: this should be done in a cleaner way
+ with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None):
+ with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool:
+ self.loop.set_default_executor(pool)
+ # Run the queues
+ self._sched()
+ self.loop.run_forever()
+ self.loop.close()
# Invoke the ticker callback a final time to render pending messages
self._ticker_callback()
@@ -351,13 +385,6 @@ class Scheduler:
# If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
- # Make sure fork is allowed before starting jobs
- if not self.context.prepare_fork():
- message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
- self.context.messenger.message(message)
- self.terminate()
- return
-
# Start the jobs
#
for job in ready:
@@ -415,9 +442,10 @@ class Scheduler:
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- # Notify that we're suspended
- for job in self._active_jobs:
- job.suspend()
+ _signals.is_not_suspended.clear()
+
+ for suspender in reversed(_signals.suspendable_stack):
+ suspender.suspend()
# _resume_jobs()
#
@@ -425,8 +453,10 @@ class Scheduler:
#
def _resume_jobs(self):
if self.suspended:
- for job in self._active_jobs:
- job.resume()
+ for suspender in _signals.suspendable_stack:
+ suspender.resume()
+
+ _signals.is_not_suspended.set()
self.suspended = False
# Notify that we're unsuspended
self._state.offset_start_time(datetime.datetime.now() - self._suspendtime)
@@ -458,12 +488,6 @@ class Scheduler:
# A loop registered event callback for SIGTSTP
#
def _suspend_event(self):
-
- # Ignore the feedback signals from Job.suspend()
- if self.internal_stops:
- self.internal_stops -= 1
- return
-
# No need to care if jobs were suspended or not, we _only_ handle this
# while we know jobs are not suspended.
self._suspend_jobs()
@@ -485,13 +509,6 @@ class Scheduler:
self.loop.remove_signal_handler(signal.SIGTERM)
def _terminate_jobs_real(self):
- def kill_jobs():
- for job_ in self._active_jobs:
- job_.kill()
-
- # Schedule all jobs to be killed if they have not exited after timeout
- self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs)
-
for job in self._active_jobs:
job.terminate()