summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tm@tlater.net>2018-07-11 10:53:07 +0100
committerTristan Maat <tm@tlater.net>2018-07-11 11:28:28 +0100
commit034a5a4929242046458caf4c2a21ecd73969ccaf (patch)
tree7b8076287b622f5f37da8fe61d68c0dcb179ed25
parent7e56b788be151f29e81768b35509e72e6a9c47ec (diff)
downloadbuildstream-034a5a4929242046458caf4c2a21ecd73969ccaf.tar.gz
scheduler.py: Introduce Resources
-rw-r--r--buildstream/_scheduler/scheduler.py146
1 files changed, 70 insertions, 76 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 7e4b985ef..ee226cfb6 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -27,6 +27,8 @@ import datetime
from contextlib import contextmanager
# Local imports
+from .resources import Resources, ResourceType
+from .jobs import CacheSizeJob, CleanupJob
# A decent return code for Scheduler.run()
@@ -69,8 +71,8 @@ class Scheduler():
#
# Public members
#
- self.waiting_jobs = [] # Jobs waiting for execution
self.active_jobs = [] # Jobs currently being run in the scheduler
+ self.waiting_jobs = [] # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -92,6 +94,9 @@ class Scheduler():
self._suspendtime = None
self._queue_jobs = True # Whether we should continue to queue jobs
+ self._resources = Resources(context.sched_builders,
+ context.sched_fetchers,
+ context.sched_pushers)
# run()
#
@@ -124,7 +129,7 @@ class Scheduler():
self._connect_signals()
# Run the queues
- self.schedule_queue_jobs()
+ self._schedule_queue_jobs()
self.loop.run_forever()
self.loop.close()
@@ -204,19 +209,50 @@ class Scheduler():
starttime = timenow
return timenow - starttime
- # sched()
+ # schedule_jobs()
#
- # The main driving function of the scheduler, it will be called
- # automatically when Scheduler.run() is called initially, and needs
- # to be called whenever a job can potentially be scheduled, usually
- # when a Queue completes handling of a job.
+ # Args:
+ # jobs ([Job]): A list of jobs to schedule
#
- # This will process the Queues and pull elements through the Queues
- # and process anything that is ready.
+ # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
+ # run as soon any other queueing jobs finish, provided sufficient
+ # resources are available for them to run
+ #
+ def schedule_jobs(self, jobs):
+ for job in jobs:
+ self.waiting_jobs.append(job)
+
+ # job_completed():
+ #
+ # Called when a Job completes
+ #
+ # Args:
+ # queue (Queue): The Queue holding a complete job
+ # job (Job): The completed Job
+ # success (bool): Whether the Job completed with a success status
+ #
+ def job_completed(self, job, success):
+ self._resources.clear_job_resources(job)
+ self.active_jobs.remove(job)
+ self._job_complete_callback(job, success)
+ self._schedule_queue_jobs()
+ self._sched()
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
+ # _sched()
#
- def sched(self):
+ # The main driving function of the scheduler, it will be called
+ # automatically when Scheduler.run() is called initially,
+ #
+ def _sched(self):
for job in self.waiting_jobs:
+ self._resources.reserve_exclusive_resources(job)
+ for job in self.waiting_jobs:
+ if not self._resources.reserve_job_resources(job):
continue
job.spawn()
@@ -230,10 +266,16 @@ class Scheduler():
if not self.active_jobs and not self.waiting_jobs:
self.loop.stop()
- def schedule_jobs(self, jobs):
- self.waiting_jobs.extend(jobs)
-
- def schedule_queue_jobs(self):
+ # _schedule_queue_jobs()
+ #
+ # Ask the queues what jobs they want to schedule and schedule
+ # them. This is done here so we can ask for new jobs when jobs
+ # from previous queues become available.
+ #
+ # This will process the Queues, pull elements through the Queues
+ # and process anything that is ready.
+ #
+ def _schedule_queue_jobs(self):
ready = []
process_queues = True
@@ -259,68 +301,23 @@ class Scheduler():
# thus need all the pulls to complete before ever starting
# a build
ready.extend(chain.from_iterable(
- queue.process_ready() for queue in reversed(self.queues)
+ queue.pop_ready_jobs() for queue in reversed(self.queues)
))
- # process_ready() may have skipped jobs, adding them to
+ # pop_ready_jobs() may have skipped jobs, adding them to
# the done_queue. Pull these skipped elements forward to
# the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
self.schedule_jobs(ready)
- self.sched()
+ self._sched()
- # job_completed():
- #
- # Called when a Job completes
- #
- # Args:
- # queue (Queue): The Queue holding a complete job
- # job (Job): The completed Job
- # success (bool): Whether the Job completed with a success status
- #
- def job_completed(self, job, success):
- self.active_jobs.remove(job)
- self.schedule_queue_jobs()
- # Notify frontend
- if self._job_complete_callback:
- self._job_complete_callback(job, success)
- # get_job_token():
- #
- # Used by the Queue object to obtain a token for
- # processing a Job, if a Queue does not receive a token
- # then it must wait until a later time in order to
- # process pending jobs.
- #
- # Args:
- # queue_type (QueueType): The type of token to obtain
- #
- # Returns:
- # (bool): Whether a token was handed out or not
- #
- def get_job_token(self, queue_type):
- if self._job_tokens[queue_type] > 0:
- self._job_tokens[queue_type] -= 1
- return True
- return False
+ self.schedule_jobs([job])
- # put_job_token():
- #
- # Return a job token to the scheduler. Tokens previously
- # received with get_job_token() must be returned to
- # the scheduler once the associated job is complete.
- #
- # Args:
- # queue_type (QueueType): The type of token to obtain
- #
- def put_job_token(self, queue_type):
- self._job_tokens[queue_type] += 1
- #######################################################
- # Local Private Methods #
- #######################################################
+ self.schedule_jobs([job])
def _run_cleanup(self, cache_size):
if cache_size and cache_size < self.context.cache_quota:
@@ -349,9 +346,8 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- for queue in self.queues:
- for job in queue.active_jobs:
- job.suspend()
+ for job in self.active_jobs:
+ job.suspend()
# _resume_jobs()
#
@@ -359,9 +355,8 @@ class Scheduler():
#
def _resume_jobs(self):
if self.suspended:
- for queue in self.queues:
- for job in queue.active_jobs:
- job.resume()
+ for job in self.active_jobs:
+ job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
self._suspendtime = None
@@ -423,21 +418,20 @@ class Scheduler():
wait_start = datetime.datetime.now()
wait_limit = 20.0
- active_jobs = self.active_jobs
- for queue in self.queues:
- active_jobs.extend(queue.active_jobs)
-
# First tell all jobs to terminate
- for job in active_jobs:
+ for job in self.active_jobs:
job.terminate()
# Now wait for them to really terminate
- for job in active_jobs:
+ for job in self.active_jobs:
elapsed = datetime.datetime.now() - wait_start
timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
if not job.terminate_wait(timeout):
job.kill()
+ # Clear out the waiting jobs
+ self.waiting_jobs = []
+
# Regular timeout for driving status in the UI
def _tick(self):
elapsed = self.elapsed_time()