diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2018-07-16 15:31:55 +0100 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-07-18 14:45:59 +0900 |
commit | 249256346613dea52ad32d945dbd303f54bd245e (patch) | |
tree | 2a977a806bdc0cd813b50248fb3717d7f841ba88 /buildstream/_scheduler/scheduler.py | |
parent | 4f9da15df4691fe582c0d05ccc61a7927dee55ca (diff) | |
download | buildstream-249256346613dea52ad32d945dbd303f54bd245e.tar.gz |
Make Jobs abstract and element-independent
Diffstat (limited to 'buildstream/_scheduler/scheduler.py')
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 213 |
1 files changed, 102 insertions, 111 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 7bfbc958e..bc182db32 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -21,12 +21,13 @@ # System imports import os import asyncio +from itertools import chain import signal import datetime from contextlib import contextmanager # Local imports -from .queues import QueueType +from .resources import Resources # A decent return code for Scheduler.run() @@ -69,6 +70,8 @@ class Scheduler(): # # Public members # + self.active_jobs = [] # Jobs currently being run in the scheduler + self.waiting_jobs = [] # Jobs waiting for resources self.queues = None # Exposed for the frontend to print summaries self.context = context # The Context object shared with Queues self.terminated = False # Whether the scheduler was asked to terminate or has terminated @@ -90,13 +93,9 @@ class Scheduler(): self._suspendtime = None self._queue_jobs = True # Whether we should continue to queue jobs - # Initialize task tokens with the number allowed by - # the user configuration - self._job_tokens = { - QueueType.FETCH: context.sched_fetchers, - QueueType.BUILD: context.sched_builders, - QueueType.PUSH: context.sched_pushers - } + self._resources = Resources(context.sched_builders, + context.sched_fetchers, + context.sched_pushers) # run() # @@ -129,7 +128,7 @@ class Scheduler(): self._connect_signals() # Run the queues - self.sched() + self._schedule_queue_jobs() self.loop.run_forever() self.loop.close() @@ -209,18 +208,74 @@ class Scheduler(): starttime = timenow return timenow - starttime - # sched() + # schedule_jobs() + # + # Args: + # jobs ([Job]): A list of jobs to schedule + # + # Schedule 'Job's for the scheduler to run. Jobs scheduled will be + # run as soon any other queueing jobs finish, provided sufficient + # resources are available for them to run + # + def schedule_jobs(self, jobs): + for job in jobs: + self.waiting_jobs.append(job) + + # job_completed(): + # + # Called when a Job completes + # + # Args: + # queue (Queue): The Queue holding a complete job + # job (Job): The completed Job + # success (bool): Whether the Job completed with a success status + # + def job_completed(self, job, success): + self._resources.clear_job_resources(job) + self.active_jobs.remove(job) + self._job_complete_callback(job, success) + self._schedule_queue_jobs() + self._sched() + + ####################################################### + # Local Private Methods # + ####################################################### + + # _sched() # # The main driving function of the scheduler, it will be called - # automatically when Scheduler.run() is called initially, and needs - # to be called whenever a job can potentially be scheduled, usually - # when a Queue completes handling of a job. + # automatically when Scheduler.run() is called initially, + # + def _sched(self): + for job in self.waiting_jobs: + self._resources.reserve_exclusive_resources(job) + + for job in self.waiting_jobs: + if not self._resources.reserve_job_resources(job): + continue + + job.spawn() + self.waiting_jobs.remove(job) + self.active_jobs.append(job) + + if self._job_start_callback: + self._job_start_callback(job) + + # If nothings ticking, time to bail out + if not self.active_jobs and not self.waiting_jobs: + self.loop.stop() + + # _schedule_queue_jobs() # - # This will process the Queues and pull elements through the Queues + # Ask the queues what jobs they want to schedule and schedule + # them. This is done here so we can ask for new jobs when jobs + # from previous queues become available. + # + # This will process the Queues, pull elements through the Queues # and process anything that is ready. # - def sched(self): - + def _schedule_queue_jobs(self): + ready = [] process_queues = True while self._queue_jobs and process_queues: @@ -233,90 +288,29 @@ class Scheduler(): # Dequeue processed elements for the next queue elements = list(queue.dequeue()) - elements = list(elements) # Kickoff whatever processes can be processed at this time # - # We start by queuing from the last queue first, because we want to - # give priority to queues later in the scheduling process in the case - # that multiple queues share the same token type. + # We start by queuing from the last queue first, because + # we want to give priority to queues later in the + # scheduling process in the case that multiple queues + # share the same token type. # - # This avoids starvation situations where we dont move on to fetch - # tasks for elements which failed to pull, and thus need all the pulls - # to complete before ever starting a build - for queue in reversed(self.queues): - queue.process_ready() - - # process_ready() may have skipped jobs, adding them to the done_queue. - # Pull these skipped elements forward to the next queue and process them. + # This avoids starvation situations where we dont move on + # to fetch tasks for elements which failed to pull, and + # thus need all the pulls to complete before ever starting + # a build + ready.extend(chain.from_iterable( + queue.pop_ready_jobs() for queue in reversed(self.queues) + )) + + # pop_ready_jobs() may have skipped jobs, adding them to + # the done_queue. Pull these skipped elements forward to + # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) - # If nothings ticking, time to bail out - ticking = 0 - for queue in self.queues: - ticking += len(queue.active_jobs) - - if ticking == 0: - self.loop.stop() - - # get_job_token(): - # - # Used by the Queue object to obtain a token for - # processing a Job, if a Queue does not receive a token - # then it must wait until a later time in order to - # process pending jobs. - # - # Args: - # queue_type (QueueType): The type of token to obtain - # - # Returns: - # (bool): Whether a token was handed out or not - # - def get_job_token(self, queue_type): - if self._job_tokens[queue_type] > 0: - self._job_tokens[queue_type] -= 1 - return True - return False - - # put_job_token(): - # - # Return a job token to the scheduler. Tokens previously - # received with get_job_token() must be returned to - # the scheduler once the associated job is complete. - # - # Args: - # queue_type (QueueType): The type of token to obtain - # - def put_job_token(self, queue_type): - self._job_tokens[queue_type] += 1 - - # job_starting(): - # - # Called by the Queue when starting a Job - # - # Args: - # job (Job): The starting Job - # - def job_starting(self, job): - if self._job_start_callback: - self._job_start_callback(job.element, job.action_name) - - # job_completed(): - # - # Called by the Queue when a Job completes - # - # Args: - # queue (Queue): The Queue holding a complete job - # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status - # - def job_completed(self, queue, job, success): - if self._job_complete_callback: - self._job_complete_callback(job.element, queue, job.action_name, success) - - ####################################################### - # Local Private Methods # - ####################################################### + self.schedule_jobs(ready) + self._sched() # _suspend_jobs() # @@ -326,9 +320,8 @@ class Scheduler(): if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - for queue in self.queues: - for job in queue.active_jobs: - job.suspend() + for job in self.active_jobs: + job.suspend() # _resume_jobs() # @@ -336,9 +329,8 @@ class Scheduler(): # def _resume_jobs(self): if self.suspended: - for queue in self.queues: - for job in queue.active_jobs: - job.resume() + for job in self.active_jobs: + job.resume() self.suspended = False self._starttime += (datetime.datetime.now() - self._suspendtime) self._suspendtime = None @@ -401,19 +393,18 @@ class Scheduler(): wait_limit = 20.0 # First tell all jobs to terminate - for queue in self.queues: - for job in queue.active_jobs: - job.terminate() + for job in self.active_jobs: + job.terminate() # Now wait for them to really terminate - for queue in self.queues: - for job in queue.active_jobs: - elapsed = datetime.datetime.now() - wait_start - timeout = max(wait_limit - elapsed.total_seconds(), 0.0) - if not job.terminate_wait(timeout): - job.kill() - - self.loop.stop() + for job in self.active_jobs: + elapsed = datetime.datetime.now() - wait_start + timeout = max(wait_limit - elapsed.total_seconds(), 0.0) + if not job.terminate_wait(timeout): + job.kill() + + # Clear out the waiting jobs + self.waiting_jobs = [] # Regular timeout for driving status in the UI def _tick(self): |