diff options
Diffstat (limited to 'buildstream/_scheduler/scheduler.py')
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 31 |
1 files changed, 14 insertions, 17 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 0fe996266..3bad5bf73 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -221,8 +221,7 @@ class Scheduler(): # resources are available for them to run # def schedule_jobs(self, jobs): - for job in jobs: - self._waiting_jobs.append(job) + self._waiting_jobs.extend(jobs) # job_completed(): # @@ -270,20 +269,28 @@ class Scheduler(): # def _sched(self): - for job in self._waiting_jobs: + uninitiated_jobs = [] + + # Iterate the list backwards because we want to give priority + # to queues later in the scheduling process when multiple + # queues share the same token type. + for job in reversed(self._waiting_jobs): self._resources.reserve_exclusive_resources(job) - for job in self._waiting_jobs: + for job in reversed(self._waiting_jobs): if not self._resources.reserve_job_resources(job): + uninitiated_jobs.append(job) continue job.spawn() - self._waiting_jobs.remove(job) self.active_jobs.append(job) if self._job_start_callback: self._job_start_callback(job) + uninitiated_jobs.reverse() + self._waiting_jobs = uninitiated_jobs + # If nothings ticking, time to bail out if not self.active_jobs and not self._waiting_jobs: self.loop.stop() @@ -312,19 +319,9 @@ class Scheduler(): # Dequeue processed elements for the next queue elements = list(queue.dequeue()) - # Kickoff whatever processes can be processed at this time - # - # We start by queuing from the last queue first, because - # we want to give priority to queues later in the - # scheduling process in the case that multiple queues - # share the same token type. - # - # This avoids starvation situations where we dont move on - # to fetch tasks for elements which failed to pull, and - # thus need all the pulls to complete before ever starting - # a build + # Get the jobs which have had their prerequisites met. ready.extend(chain.from_iterable( - queue.pop_ready_jobs() for queue in reversed(self.queues) + queue.pop_ready_jobs() for queue in self.queues )) # pop_ready_jobs() may have skipped jobs, adding them to |