From a247912c29d84ddf349dcc5d11beb5976a3b0d8a Mon Sep 17 00:00:00 2001 From: Tristan Van Berkom Date: Fri, 15 May 2020 21:56:57 +0900 Subject: _scheduler: Fix order of launching jobs and sending notifications. Sending notifications causes potentially large bodies of code to run in the abstracted frontend codebase, we are not allowed to have knowledge of the frontend from this code. Previously, we were adding the job to the active jobs, sending the notification, and then starting the job. This means that if a BuildStream frontend implementation crashes, we handle the excepting in an inconsistent state and try to kill jobs which are not running. In addition to making sure that active_jobs list adjustment and job starting does not have any code body run in the danger window in between these, this patch also adds some fault tolerance and assertions around job termination so that: o Job.terminate() and Job.kill() do not crash with None _process o Job.start() raises an assertion if started after being terminated This fixes the infinite looping aspects of frontend crashes at job_start() time described in #1312. --- src/buildstream/_scheduler/jobs/job.py | 8 ++++++-- src/buildstream/_scheduler/scheduler.py | 9 ++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 823404b59..23aa51e58 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -156,6 +156,8 @@ class Job: # def start(self): + assert not self._terminated, "Attempted to start process which was already terminated" + self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) self._tries += 1 @@ -214,7 +216,8 @@ class Job: self._parent_stop_listening() # Terminate the process using multiprocessing API pathway - self._process.terminate() + if self._process: + self._process.terminate() self._terminated = True @@ -235,7 +238,8 @@ class Job: def kill(self): # Force kill self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) - utils._kill_process_tree(self._process.pid) + if self._process: + utils._kill_process_tree(self._process.pid) # suspend() # diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 0bcbb7182..ae510c2e0 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -375,7 +375,15 @@ class Scheduler: # job (Job): The job to start # def _start_job(self, job): + + # From the scheduler perspective, the following + # is considered atomic; started jobs are always in the + # active_jobs list, and jobs in the active_jobs list + # are always started. + # self._active_jobs.append(job) + job.start() + notification = Notification( NotificationType.JOB_START, full_name=job.name, @@ -383,7 +391,6 @@ class Scheduler: time=self._state.elapsed_time(start_time=self._starttime), ) self._notify(notification) - job.start() # _sched_queue_jobs() # -- cgit v1.2.1