summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-05-19 10:22:48 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-05-19 10:22:48 +0000
commit99063c732b56fb92f6c8b8c32bdb66eb4fb015e6 (patch)
treef8c5840d51b377f9d457aed09b21e0167e16ffc7
parent7cdd7f4e085d6750235c2f8e8f578eae07db9655 (diff)
parentf41c4dc515eac959ab8adec3c0e0d0ef92b43930 (diff)
downloadbuildstream-99063c732b56fb92f6c8b8c32bdb66eb4fb015e6.tar.gz
Merge branch 'tristan/improve-frontend-crash-handling' into 'master'
Improve handling of frontend crashes Closes #1312 See merge request BuildStream/buildstream!1933
-rw-r--r--src/buildstream/_scheduler/jobs/job.py8
-rw-r--r--src/buildstream/_scheduler/scheduler.py27
2 files changed, 29 insertions, 6 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 823404b59..23aa51e58 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -156,6 +156,8 @@ class Job:
#
def start(self):
+ assert not self._terminated, "Attempted to start process which was already terminated"
+
self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
self._tries += 1
@@ -214,7 +216,8 @@ class Job:
self._parent_stop_listening()
# Terminate the process using multiprocessing API pathway
- self._process.terminate()
+ if self._process:
+ self._process.terminate()
self._terminated = True
@@ -235,7 +238,8 @@ class Job:
def kill(self):
# Force kill
self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name))
- utils._kill_process_tree(self._process.pid)
+ if self._process:
+ utils._kill_process_tree(self._process.pid)
# suspend()
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0bcbb7182..43c6c9680 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -375,7 +375,15 @@ class Scheduler:
# job (Job): The job to start
#
def _start_job(self, job):
+
+ # From the scheduler perspective, the following
+ # is considered atomic; started jobs are always in the
+ # active_jobs list, and jobs in the active_jobs list
+ # are always started.
+ #
self._active_jobs.append(job)
+ job.start()
+
notification = Notification(
NotificationType.JOB_START,
full_name=job.name,
@@ -383,7 +391,6 @@ class Scheduler:
time=self._state.elapsed_time(start_time=self._starttime),
)
self._notify(notification)
- job.start()
# _sched_queue_jobs()
#
@@ -449,6 +456,21 @@ class Scheduler:
#
def _sched(self):
def real_schedule():
+
+ # Reset the scheduling handle before queuing any jobs.
+ #
+ # We do this right away because starting jobs can result
+ # in their being terminated and completed during the body
+ # of this function, and we want to be sure that we get
+ # called again in this case.
+ #
+ # This can happen if jobs are explicitly killed as a result,
+ # which might happen as a side effect of a crash in an
+ # abstracted frontend implementation handling notifications
+ # about jobs starting.
+ #
+ self._sched_handle = None
+
if not self.terminated:
#
@@ -457,9 +479,6 @@ class Scheduler:
#
self._sched_queue_jobs()
- # Reset the scheduling hand
- self._sched_handle = None
-
#
# If nothing is ticking then bail out
#