summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2019-10-04 13:39:46 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-10-08 11:50:06 +0000
commit7ed38afd418ba670376c7a21612d049aa3610014 (patch)
tree9497dbdc3d381c3988ff3ad39f3f489869d6cf91
parent97cd51d6d018c43d32ca433f0bcad7cf0dd77ca0 (diff)
downloadbuildstream-bschubert/casd-listen-failures.tar.gz
_scheduler.py: Listen for buildbox-casd failure and terminatebschubert/casd-listen-failures
This adds a listener on the scheduler's event loop to ensure that the buildbox-casd process stays alive during the run. If that fails, terminate all running processes, we know they can't succeed anyways and exit accordingly.
-rw-r--r--src/buildstream/_cas/cascache.py11
-rw-r--r--src/buildstream/_scheduler/scheduler.py33
-rw-r--r--src/buildstream/_stream.py2
3 files changed, 44 insertions, 2 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index f7855afc4..b6893503f 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -1049,6 +1049,17 @@ class CASCache():
return self._cache_usage_monitor.get_cache_usage()
+ # get_casd_process()
+ #
+ # Get the underlying buildbox-casd process
+ #
+ # Returns:
+ # (subprocess.Process): The casd process that is used for the current cascache
+ #
+ def get_casd_process(self):
+ assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
+ return self._casd_process
+
# _CASCacheUsage
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d0a189545..6133cbfd7 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -30,6 +30,7 @@ from .resources import Resources
from .jobs import JobStatus
from ..types import FastEnum
from .._profile import Topics, PROFILER
+from .._message import Message, MessageType
from ..plugin import Plugin
@@ -137,6 +138,7 @@ class Scheduler():
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
+ self._casd_process = None # handle to the casd process for monitoring purpose
# Bidirectional queue to send notifications back to the Scheduler's owner
self._notification_queue = notification_queue
@@ -150,6 +152,8 @@ class Scheduler():
#
# Args:
# queues (list): A list of Queue objects
+ # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
+ # of failures.
#
# Returns:
# (SchedStatus): How the scheduling terminated
@@ -159,7 +163,7 @@ class Scheduler():
# elements have been processed by each queue or when
# an error arises
#
- def run(self, queues):
+ def run(self, queues, casd_process):
assert self.context.is_fork_allowed()
@@ -180,6 +184,11 @@ class Scheduler():
# Handle unix signals while running
self._connect_signals()
+ # Watch casd while running to ensure it doesn't die
+ self._casd_process = casd_process
+ _watcher = asyncio.get_child_watcher()
+ _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
# Run the queues
@@ -187,6 +196,10 @@ class Scheduler():
self.loop.run_forever()
self.loop.close()
+ # Stop watching casd
+ _watcher.remove_child_handler(casd_process.pid)
+ self._casd_process = None
+
# Stop handling unix signals
self._disconnect_signals()
@@ -319,6 +332,24 @@ class Scheduler():
# Local Private Methods #
#######################################################
+ # _abort_on_casd_failure()
+ #
+ # Abort if casd failed while running.
+ #
+ # This will terminate immediately all jobs, since buildbox-casd is dead,
+ # we can't do anything with them anymore.
+ #
+ # Args:
+ # pid (int): the process id under which buildbox-casd was running
+ # returncode (int): the return code with which buildbox-casd exited
+ #
+ def _abort_on_casd_failure(self, pid, returncode):
+ message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
+ self._notify(Notification(NotificationType.MESSAGE, message=message))
+
+ self._casd_process.returncode = returncode
+ self.terminate_jobs()
+
# _start_job()
#
# Spanws a job
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 75b3dd84e..6e4e5caec 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1375,7 +1375,7 @@ class Stream():
if self._session_start_callback is not None:
self._session_start_callback()
- status = self._scheduler.run(self.queues)
+ status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
if status == SchedStatus.ERROR:
raise StreamError()