summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhil Dawson <phil.dawson@codethink.co.uk>2018-11-21 09:10:05 +0000
committerPhil Dawson <phil.dawson@codethink.co.uk>2018-12-11 10:54:08 +0000
commit57236af65c306ebbbec6d01f99593319b8cdeabf (patch)
treec0e80087a421e1415b62f6adafeec1712d70935e
parentff2eece3ca739a1d2244a4a3bf07b1b488089e67 (diff)
downloadbuildstream-phil/712-high-priority-job-queue.tar.gz
WIP: scheduler.py: Add a second high priority queuephil/712-high-priority-job-queue
Adds a queue which allows "high priority" jobs to bypass the main waiting_jobs queue. This is then used to ensure that fetch jobs are prioritised over pull jobs.
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py1
-rw-r--r--buildstream/_scheduler/queues/queue.py1
-rw-r--r--buildstream/_scheduler/scheduler.py58
3 files changed, 37 insertions, 23 deletions
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 446dbbd3b..5c441ba64 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -33,6 +33,7 @@ class FetchQueue(Queue):
action_name = "Fetch"
complete_name = "Fetched"
resources = [ResourceType.DOWNLOAD]
+ high_priority = True
def __init__(self, scheduler, skip_cached=False):
super().__init__(scheduler)
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 909cebb44..3c0414036 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -58,6 +58,7 @@ class Queue():
action_name = None
complete_name = None
resources = [] # Resources this queues' jobs want
+ high_priority = False # If jobs from this queue should be prioritised by the scheduler
def __init__(self, scheduler):
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c7308e..1b05415fd 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -71,12 +71,13 @@ class Scheduler():
#
# Public members
#
- self.active_jobs = [] # Jobs currently being run in the scheduler
- self.waiting_jobs = [] # Jobs waiting for resources
- self.queues = None # Exposed for the frontend to print summaries
- self.context = context # The Context object shared with Queues
- self.terminated = False # Whether the scheduler was asked to terminate or has terminated
- self.suspended = False # Whether the scheduler is currently suspended
+ self.active_jobs = [] # Jobs currently being run in the scheduler
+ self.waiting_jobs = [] # Jobs waiting for resources
+ self.waiting_priority_jobs = [] # High priority jobs waiting for resources
+ self.queues = None # Exposed for the frontend to print summaries
+ self.context = context # The Context object shared with Queues
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
+ self.suspended = False # Whether the scheduler is currently suspended
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
@@ -220,7 +221,9 @@ class Scheduler():
# run as soon any other queueing jobs finish, provided sufficient
# resources are available for them to run
#
- def schedule_jobs(self, jobs):
+ def schedule_jobs(self, jobs, priority_jobs):
+ for job in priority_jobs:
+ self.waiting_priority_jobs.append(job)
for job in jobs:
self.waiting_jobs.append(job)
@@ -257,7 +260,7 @@ class Scheduler():
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
complete_cb=self._run_cleanup)
- self.schedule_jobs([job])
+ self.schedule_jobs([job], [])
#######################################################
# Local Private Methods #
@@ -269,22 +272,27 @@ class Scheduler():
# automatically when Scheduler.run() is called initially,
#
def _sched(self):
- for job in self.waiting_jobs:
- self._resources.reserve_exclusive_resources(job)
+ def allocate_resources_and_spawn_jobs(job_list):
+ for job in job_list:
+ self._resources.reserve_exclusive_resources(job)
+
+ for job in job_list:
+ if not self._resources.reserve_job_resources(job):
+ continue
- for job in self.waiting_jobs:
- if not self._resources.reserve_job_resources(job):
- continue
+ job.spawn()
+ job_list.remove(job)
+ self.active_jobs.append(job)
- job.spawn()
- self.waiting_jobs.remove(job)
- self.active_jobs.append(job)
+ if self._job_start_callback:
+ self._job_start_callback(job)
- if self._job_start_callback:
- self._job_start_callback(job)
+ # Process jobs from the high priority list first
+ allocate_resources_and_spawn_jobs(self.waiting_priority_jobs)
+ allocate_resources_and_spawn_jobs(self.waiting_jobs)
# If nothings ticking, time to bail out
- if not self.active_jobs and not self.waiting_jobs:
+ if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs:
self.loop.stop()
# _schedule_queue_jobs()
@@ -298,6 +306,7 @@ class Scheduler():
#
def _schedule_queue_jobs(self):
ready = []
+ ready_priority = []
process_queues = True
while self._queue_jobs and process_queues:
@@ -322,16 +331,19 @@ class Scheduler():
# to fetch tasks for elements which failed to pull, and
# thus need all the pulls to complete before ever starting
# a build
- ready.extend(chain.from_iterable(
- queue.pop_ready_jobs() for queue in reversed(self.queues)
- ))
+
+ for queue in reversed(self.queues):
+ if queue.high_priority:
+ ready_priority.extend(queue.pop_ready_jobs())
+ else:
+ ready.extend(queue.pop_ready_jobs())
# pop_ready_jobs() may have skipped jobs, adding them to
# the done_queue. Pull these skipped elements forward to
# the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
- self.schedule_jobs(ready)
+ self.schedule_jobs(ready, ready_priority)
self._sched()
# _run_cleanup()