diff options
author | Benjamin Schubert <contact@benschubert.me> | 2019-10-04 13:39:46 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-10-08 17:16:23 +0000 |
commit | 5ea011c7ff420c6c9a4985088d4fadf2c066f7a2 (patch) | |
tree | 533baa5da4baf04b1e797e2ebb336dffe2fac3af /src | |
parent | d268ff6fff7ab22e1080a500e0469f3dc9a54274 (diff) | |
download | buildstream-5ea011c7ff420c6c9a4985088d4fadf2c066f7a2.tar.gz |
_scheduler.py: Listen for buildbox-casd failure and terminate
This adds a listener on the scheduler's event loop to ensure that
the buildbox-casd process stays alive during the run. If that fails,
terminate all running processes, we know they can't succeed anyways
and exit accordingly.
Diffstat (limited to 'src')
-rw-r--r-- | src/buildstream/_cas/cascache.py | 11 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 33 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 2 |
3 files changed, 44 insertions, 2 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index f7855afc4..b6893503f 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -1049,6 +1049,17 @@ class CASCache(): return self._cache_usage_monitor.get_cache_usage() + # get_casd_process() + # + # Get the underlying buildbox-casd process + # + # Returns: + # (subprocess.Process): The casd process that is used for the current cascache + # + def get_casd_process(self): + assert self._casd_process is not None, "This should only be called with a running buildbox-casd process" + return self._casd_process + # _CASCacheUsage # diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index d0a189545..6133cbfd7 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -30,6 +30,7 @@ from .resources import Resources from .jobs import JobStatus from ..types import FastEnum from .._profile import Topics, PROFILER +from .._message import Message, MessageType from ..plugin import Plugin @@ -137,6 +138,7 @@ class Scheduler(): self._suspendtime = None # Session time compensation for suspended state self._queue_jobs = True # Whether we should continue to queue jobs self._state = state + self._casd_process = None # handle to the casd process for monitoring purpose # Bidirectional queue to send notifications back to the Scheduler's owner self._notification_queue = notification_queue @@ -150,6 +152,8 @@ class Scheduler(): # # Args: # queues (list): A list of Queue objects + # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified + # of failures. # # Returns: # (SchedStatus): How the scheduling terminated @@ -159,7 +163,7 @@ class Scheduler(): # elements have been processed by each queue or when # an error arises # - def run(self, queues): + def run(self, queues, casd_process): assert self.context.is_fork_allowed() @@ -180,6 +184,11 @@ class Scheduler(): # Handle unix signals while running self._connect_signals() + # Watch casd while running to ensure it doesn't die + self._casd_process = casd_process + _watcher = asyncio.get_child_watcher() + _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure) + # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues @@ -187,6 +196,10 @@ class Scheduler(): self.loop.run_forever() self.loop.close() + # Stop watching casd + _watcher.remove_child_handler(casd_process.pid) + self._casd_process = None + # Stop handling unix signals self._disconnect_signals() @@ -319,6 +332,24 @@ class Scheduler(): # Local Private Methods # ####################################################### + # _abort_on_casd_failure() + # + # Abort if casd failed while running. + # + # This will terminate immediately all jobs, since buildbox-casd is dead, + # we can't do anything with them anymore. + # + # Args: + # pid (int): the process id under which buildbox-casd was running + # returncode (int): the return code with which buildbox-casd exited + # + def _abort_on_casd_failure(self, pid, returncode): + message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.") + self._notify(Notification(NotificationType.MESSAGE, message=message)) + + self._casd_process.returncode = returncode + self.terminate_jobs() + # _start_job() # # Spanws a job diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 75b3dd84e..6e4e5caec 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1375,7 +1375,7 @@ class Stream(): if self._session_start_callback is not None: self._session_start_callback() - status = self._scheduler.run(self.queues) + status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process()) if status == SchedStatus.ERROR: raise StreamError() |