diff options
Diffstat (limited to 'buildstream/_scheduler/scheduler.py')
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 146 |
1 files changed, 70 insertions, 76 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 7e4b985ef..ee226cfb6 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -27,6 +27,8 @@ import datetime from contextlib import contextmanager # Local imports +from .resources import Resources, ResourceType +from .jobs import CacheSizeJob, CleanupJob # A decent return code for Scheduler.run() @@ -69,8 +71,8 @@ class Scheduler(): # # Public members # - self.waiting_jobs = [] # Jobs waiting for execution self.active_jobs = [] # Jobs currently being run in the scheduler + self.waiting_jobs = [] # Jobs waiting for resources self.queues = None # Exposed for the frontend to print summaries self.context = context # The Context object shared with Queues self.terminated = False # Whether the scheduler was asked to terminate or has terminated @@ -92,6 +94,9 @@ class Scheduler(): self._suspendtime = None self._queue_jobs = True # Whether we should continue to queue jobs + self._resources = Resources(context.sched_builders, + context.sched_fetchers, + context.sched_pushers) # run() # @@ -124,7 +129,7 @@ class Scheduler(): self._connect_signals() # Run the queues - self.schedule_queue_jobs() + self._schedule_queue_jobs() self.loop.run_forever() self.loop.close() @@ -204,19 +209,50 @@ class Scheduler(): starttime = timenow return timenow - starttime - # sched() + # schedule_jobs() # - # The main driving function of the scheduler, it will be called - # automatically when Scheduler.run() is called initially, and needs - # to be called whenever a job can potentially be scheduled, usually - # when a Queue completes handling of a job. + # Args: + # jobs ([Job]): A list of jobs to schedule # - # This will process the Queues and pull elements through the Queues - # and process anything that is ready. + # Schedule 'Job's for the scheduler to run. Jobs scheduled will be + # run as soon any other queueing jobs finish, provided sufficient + # resources are available for them to run + # + def schedule_jobs(self, jobs): + for job in jobs: + self.waiting_jobs.append(job) + + # job_completed(): + # + # Called when a Job completes + # + # Args: + # queue (Queue): The Queue holding a complete job + # job (Job): The completed Job + # success (bool): Whether the Job completed with a success status + # + def job_completed(self, job, success): + self._resources.clear_job_resources(job) + self.active_jobs.remove(job) + self._job_complete_callback(job, success) + self._schedule_queue_jobs() + self._sched() + + ####################################################### + # Local Private Methods # + ####################################################### + + # _sched() # - def sched(self): + # The main driving function of the scheduler, it will be called + # automatically when Scheduler.run() is called initially, + # + def _sched(self): for job in self.waiting_jobs: + self._resources.reserve_exclusive_resources(job) + for job in self.waiting_jobs: + if not self._resources.reserve_job_resources(job): continue job.spawn() @@ -230,10 +266,16 @@ class Scheduler(): if not self.active_jobs and not self.waiting_jobs: self.loop.stop() - def schedule_jobs(self, jobs): - self.waiting_jobs.extend(jobs) - - def schedule_queue_jobs(self): + # _schedule_queue_jobs() + # + # Ask the queues what jobs they want to schedule and schedule + # them. This is done here so we can ask for new jobs when jobs + # from previous queues become available. + # + # This will process the Queues, pull elements through the Queues + # and process anything that is ready. + # + def _schedule_queue_jobs(self): ready = [] process_queues = True @@ -259,68 +301,23 @@ class Scheduler(): # thus need all the pulls to complete before ever starting # a build ready.extend(chain.from_iterable( - queue.process_ready() for queue in reversed(self.queues) + queue.pop_ready_jobs() for queue in reversed(self.queues) )) - # process_ready() may have skipped jobs, adding them to + # pop_ready_jobs() may have skipped jobs, adding them to # the done_queue. Pull these skipped elements forward to # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) self.schedule_jobs(ready) - self.sched() + self._sched() - # job_completed(): - # - # Called when a Job completes - # - # Args: - # queue (Queue): The Queue holding a complete job - # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status - # - def job_completed(self, job, success): - self.active_jobs.remove(job) - self.schedule_queue_jobs() - # Notify frontend - if self._job_complete_callback: - self._job_complete_callback(job, success) - # get_job_token(): - # - # Used by the Queue object to obtain a token for - # processing a Job, if a Queue does not receive a token - # then it must wait until a later time in order to - # process pending jobs. - # - # Args: - # queue_type (QueueType): The type of token to obtain - # - # Returns: - # (bool): Whether a token was handed out or not - # - def get_job_token(self, queue_type): - if self._job_tokens[queue_type] > 0: - self._job_tokens[queue_type] -= 1 - return True - return False + self.schedule_jobs([job]) - # put_job_token(): - # - # Return a job token to the scheduler. Tokens previously - # received with get_job_token() must be returned to - # the scheduler once the associated job is complete. - # - # Args: - # queue_type (QueueType): The type of token to obtain - # - def put_job_token(self, queue_type): - self._job_tokens[queue_type] += 1 - ####################################################### - # Local Private Methods # - ####################################################### + self.schedule_jobs([job]) def _run_cleanup(self, cache_size): if cache_size and cache_size < self.context.cache_quota: @@ -349,9 +346,8 @@ class Scheduler(): if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - for queue in self.queues: - for job in queue.active_jobs: - job.suspend() + for job in self.active_jobs: + job.suspend() # _resume_jobs() # @@ -359,9 +355,8 @@ class Scheduler(): # def _resume_jobs(self): if self.suspended: - for queue in self.queues: - for job in queue.active_jobs: - job.resume() + for job in self.active_jobs: + job.resume() self.suspended = False self._starttime += (datetime.datetime.now() - self._suspendtime) self._suspendtime = None @@ -423,21 +418,20 @@ class Scheduler(): wait_start = datetime.datetime.now() wait_limit = 20.0 - active_jobs = self.active_jobs - for queue in self.queues: - active_jobs.extend(queue.active_jobs) - # First tell all jobs to terminate - for job in active_jobs: + for job in self.active_jobs: job.terminate() # Now wait for them to really terminate - for job in active_jobs: + for job in self.active_jobs: elapsed = datetime.datetime.now() - wait_start timeout = max(wait_limit - elapsed.total_seconds(), 0.0) if not job.terminate_wait(timeout): job.kill() + # Clear out the waiting jobs + self.waiting_jobs = [] + # Regular timeout for driving status in the UI def _tick(self): elapsed = self.elapsed_time() |