summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Dawson <phil.dawson@codethink.co.uk>2018-12-07 15:02:11 +0000
committerPhil Dawson <phil.dawson@codethink.co.uk>2018-12-07 15:02:11 +0000
commitfa8fb519a6568bf9ae09991cb1767c9bfc92a160 (patch)
treee36b0f07d753d7265c3508f709dd092c0b1f35fd
parentbdf9c14a2cee47daef35200f72f94f42c945d2d1 (diff)
downloadbuildstream-phil/712-_sched-refactor.tar.gz
scheduler.py: Prioritise jobs from later queuesphil/712-_sched-refactor
Previously, we were reversing the order of queues in _schedule_queue_jobs() in an attempt to avoid resource starvation of queues which share resource types. This was incorrect as the earlier jobs were still ready first and so were scheduled first. Instead we schedule jobs starting from the most recently ready. This will mean that queues later in the scheduling process will always have priority when they share a resource type with another queue. This also fixes a bug in _sched() arising from the fact we were removing items from waiting_jobs while iterating through it. Aside from resulting in the loop having an O(n^2) complexity, this results in skipping the job following the removed item in the iteration. This commit is related to issue #712
-rw-r--r--buildstream/_scheduler/scheduler.py31
1 files changed, 14 insertions, 17 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 0fe996266..3bad5bf73 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -221,8 +221,7 @@ class Scheduler():
# resources are available for them to run
#
def schedule_jobs(self, jobs):
- for job in jobs:
- self._waiting_jobs.append(job)
+ self._waiting_jobs.extend(jobs)
# job_completed():
#
@@ -270,20 +269,28 @@ class Scheduler():
#
def _sched(self):
- for job in self._waiting_jobs:
+ uninitiated_jobs = []
+
+ # Iterate the list backwards because we want to give priority
+ # to queues later in the scheduling process when multiple
+ # queues share the same token type.
+ for job in reversed(self._waiting_jobs):
self._resources.reserve_exclusive_resources(job)
- for job in self._waiting_jobs:
+ for job in reversed(self._waiting_jobs):
if not self._resources.reserve_job_resources(job):
+ uninitiated_jobs.append(job)
continue
job.spawn()
- self._waiting_jobs.remove(job)
self.active_jobs.append(job)
if self._job_start_callback:
self._job_start_callback(job)
+ uninitiated_jobs.reverse()
+ self._waiting_jobs = uninitiated_jobs
+
# If nothings ticking, time to bail out
if not self.active_jobs and not self._waiting_jobs:
self.loop.stop()
@@ -312,19 +319,9 @@ class Scheduler():
# Dequeue processed elements for the next queue
elements = list(queue.dequeue())
- # Kickoff whatever processes can be processed at this time
- #
- # We start by queuing from the last queue first, because
- # we want to give priority to queues later in the
- # scheduling process in the case that multiple queues
- # share the same token type.
- #
- # This avoids starvation situations where we dont move on
- # to fetch tasks for elements which failed to pull, and
- # thus need all the pulls to complete before ever starting
- # a build
+ # Get the jobs which have had their prerequisites met.
ready.extend(chain.from_iterable(
- queue.pop_ready_jobs() for queue in reversed(self.queues)
+ queue.pop_ready_jobs() for queue in self.queues
))
# pop_ready_jobs() may have skipped jobs, adding them to