diff options
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 8 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 27 |
2 files changed, 29 insertions, 6 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 823404b59..23aa51e58 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -156,6 +156,8 @@ class Job: # def start(self): + assert not self._terminated, "Attempted to start process which was already terminated" + self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) self._tries += 1 @@ -214,7 +216,8 @@ class Job: self._parent_stop_listening() # Terminate the process using multiprocessing API pathway - self._process.terminate() + if self._process: + self._process.terminate() self._terminated = True @@ -235,7 +238,8 @@ class Job: def kill(self): # Force kill self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) - utils._kill_process_tree(self._process.pid) + if self._process: + utils._kill_process_tree(self._process.pid) # suspend() # diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 0bcbb7182..43c6c9680 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -375,7 +375,15 @@ class Scheduler: # job (Job): The job to start # def _start_job(self, job): + + # From the scheduler perspective, the following + # is considered atomic; started jobs are always in the + # active_jobs list, and jobs in the active_jobs list + # are always started. + # self._active_jobs.append(job) + job.start() + notification = Notification( NotificationType.JOB_START, full_name=job.name, @@ -383,7 +391,6 @@ class Scheduler: time=self._state.elapsed_time(start_time=self._starttime), ) self._notify(notification) - job.start() # _sched_queue_jobs() # @@ -449,6 +456,21 @@ class Scheduler: # def _sched(self): def real_schedule(): + + # Reset the scheduling handle before queuing any jobs. + # + # We do this right away because starting jobs can result + # in their being terminated and completed during the body + # of this function, and we want to be sure that we get + # called again in this case. + # + # This can happen if jobs are explicitly killed as a result, + # which might happen as a side effect of a crash in an + # abstracted frontend implementation handling notifications + # about jobs starting. + # + self._sched_handle = None + if not self.terminated: # @@ -457,9 +479,6 @@ class Scheduler: # self._sched_queue_jobs() - # Reset the scheduling hand - self._sched_handle = None - # # If nothing is ticking then bail out # |