summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildstream/_scheduler/scheduler.py31
1 files changed, 14 insertions, 17 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 0fe996266..3bad5bf73 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -221,8 +221,7 @@ class Scheduler():
# resources are available for them to run
#
def schedule_jobs(self, jobs):
- for job in jobs:
- self._waiting_jobs.append(job)
+ self._waiting_jobs.extend(jobs)
# job_completed():
#
@@ -270,20 +269,28 @@ class Scheduler():
#
def _sched(self):
- for job in self._waiting_jobs:
+ uninitiated_jobs = []
+
+ # Iterate the list backwards because we want to give priority
+ # to queues later in the scheduling process when multiple
+ # queues share the same token type.
+ for job in reversed(self._waiting_jobs):
self._resources.reserve_exclusive_resources(job)
- for job in self._waiting_jobs:
+ for job in reversed(self._waiting_jobs):
if not self._resources.reserve_job_resources(job):
+ uninitiated_jobs.append(job)
continue
job.spawn()
- self._waiting_jobs.remove(job)
self.active_jobs.append(job)
if self._job_start_callback:
self._job_start_callback(job)
+ uninitiated_jobs.reverse()
+ self._waiting_jobs = uninitiated_jobs
+
# If nothings ticking, time to bail out
if not self.active_jobs and not self._waiting_jobs:
self.loop.stop()
@@ -312,19 +319,9 @@ class Scheduler():
# Dequeue processed elements for the next queue
elements = list(queue.dequeue())
- # Kickoff whatever processes can be processed at this time
- #
- # We start by queuing from the last queue first, because
- # we want to give priority to queues later in the
- # scheduling process in the case that multiple queues
- # share the same token type.
- #
- # This avoids starvation situations where we dont move on
- # to fetch tasks for elements which failed to pull, and
- # thus need all the pulls to complete before ever starting
- # a build
+ # Get the jobs which have had their prerequisites met.
ready.extend(chain.from_iterable(
- queue.pop_ready_jobs() for queue in reversed(self.queues)
+ queue.pop_ready_jobs() for queue in self.queues
))
# pop_ready_jobs() may have skipped jobs, adding them to