diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2018-04-26 15:48:29 +0100 |
---|---|---|
committer | Tristan Maat <tristan.maat@codethink.co.uk> | 2018-05-04 13:45:11 +0100 |
commit | 92a6e3464fdb29181556c2f12d0cc6cc99774ff4 (patch) | |
tree | 1a441bbf966ae44517d0c95bf8647938aba1b1bb | |
parent | 8e59ca0b205b5f84e1e9a88cd169bd4168a53ded (diff) | |
download | buildstream-372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes.tar.gz |
buildstream/_scheduler/*.py: Make job submission a queue job372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 23 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 92 |
3 files changed, 58 insertions, 63 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 6728ae0e6..9335cf90e 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -67,13 +67,14 @@ class Process(multiprocessing.Process): # class Job(): - def __init__(self, scheduler, action_name, logfile, *, max_retries=0): + def __init__(self, scheduler, job_type, action_name, logfile, *, max_retries=0): # # Public members # self.action_name = action_name # The action name for the Queue - self.child_data = None + self.child_data = None # Data to be sent to the main process + self.job_type = job_type # The type of the job # # Private members @@ -540,6 +541,7 @@ class Job(): return self._parent_complete(returncode == 0, self._result) + self._scheduler.job_completed(self) # _parent_process_envelope() # diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 648bebbff..434ca8745 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -78,7 +78,6 @@ class Queue(): # # Public members # - self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation self.failed_elements = [] # List of failed elements, for the frontend self.processed_elements = [] # List of processed elements, for the frontend self.skipped_elements = [] # List of skipped elements, for the frontend @@ -224,6 +223,7 @@ class Queue(): def process_ready(self): scheduler = self._scheduler unready = [] + ready = [] while self._wait_queue and scheduler.get_job_token(self.queue_type): element = self._wait_queue.popleft() @@ -242,20 +242,24 @@ class Queue(): logfile = self._element_log_path(element) self.prepare(element) - job = ElementJob(scheduler, self.action_name, + job = ElementJob(scheduler, self.queue_type, + self.action_name, logfile, element=element, action_cb=self.process, complete_cb=self._job_done, max_retries=self._max_retries) - scheduler.job_starting(job, element) + ready.append(job) - job.spawn() - self.active_jobs.append(job) + # Notify the frontend + if self._scheduler._job_start_callback: + self._scheduler._job_start_callback(element, self.action_name) # These were not ready but were in the beginning, give em # first priority again next time around self._wait_queue.extendleft(unready) + return ready + ##################################################### # Private Methods # ##################################################### @@ -299,9 +303,6 @@ class Queue(): # def _job_done(self, job, element, success, result): - # Remove from our jobs - self.active_jobs.remove(job) - # Update workspaces in the main task before calling any queue implementation self._update_workspaces(element, job) @@ -347,13 +348,11 @@ class Queue(): self.failed_elements.append(element) # Give the token for this job back to the scheduler - # immediately before invoking another round of scheduling self._scheduler.put_job_token(self.queue_type) # Notify frontend - self._scheduler.job_completed(self, job, element, success) - - self._scheduler.sched() + if self._scheduler._job_complete_callback: + self._scheduler._job_complete_callback(element, self, job.action_name, success) # Convenience wrapper for Queue implementations to send # a message for the element they are processing diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 61cfc1190..8187c7a44 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -71,8 +71,10 @@ class Scheduler(): # # Public members # - self.queues = None # Exposed for the frontend to print summaries + self.active_jobs = [] # Jobs currently being run in the scheduler self.context = context # The Context object shared with Queues + self.queues = None + self.queue_runner = None # The QueueRunner that delivers jobs to schedule self.terminated = False # Whether the scheduler was asked to terminate or has terminated self.suspended = False # Whether the scheduler is currently suspended @@ -83,7 +85,6 @@ class Scheduler(): # # Private members # - self._runners = [] self._interrupt_callback = interrupt_callback self._ticker_callback = ticker_callback self._job_start_callback = job_start_callback @@ -116,8 +117,8 @@ class Scheduler(): # def run(self, queues): + self.queue_runner = QueueRunner(self, queues) # Hold on to the queues to process - self._runners.append(QueueRunner(self, queues)) self.queues = queues # Ensure that we have a fresh new event loop, in case we want @@ -224,17 +225,38 @@ class Scheduler(): # and process anything that is ready. # def sched(self): - for runner in self._runners: - runner.schedule_jobs() + jobs = self.queue_runner.schedule_jobs() + self.run_jobs(jobs) # If nothings ticking, time to bail out - ticking = 0 - for queue in self.queues: - ticking += len(queue.active_jobs) - - if ticking == 0: + if not self.active_jobs: self.loop.stop() + # run_jobs(): + # + # Execute jobs and track them. + # + # Args: + # jobs (typing.Iterable[jobs]) - A set of jobs to run + # + def run_jobs(self, jobs): + for job in jobs: + job.spawn() + self.active_jobs.append(job) + + # job_completed(): + # + # Called when a Job completes + # + # Args: + # queue (Queue): The Queue holding a complete job + # job (Job): The completed Job + # success (bool): Whether the Job completed with a success status + # + def job_completed(self, job): + self.active_jobs.remove(job) + self.sched() + # get_job_token(): # # Used by the Queue object to obtain a token for @@ -266,30 +288,6 @@ class Scheduler(): def put_job_token(self, queue_type): self._job_tokens[queue_type] += 1 - # job_starting(): - # - # Called by the Queue when starting a Job - # - # Args: - # job (Job): The starting Job - # - def job_starting(self, job, element): - if self._job_start_callback: - self._job_start_callback(element, job.action_name) - - # job_completed(): - # - # Called by the Queue when a Job completes - # - # Args: - # queue (Queue): The Queue holding a complete job - # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status - # - def job_completed(self, queue, job, element, success): - if self._job_complete_callback: - self._job_complete_callback(element, queue, job.action_name, success) - ####################################################### # Local Private Methods # ####################################################### @@ -302,9 +300,8 @@ class Scheduler(): if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - for queue in self.queues: - for job in queue.active_jobs: - job.suspend() + for job in self.active_jobs: + job.suspend() # _resume_jobs() # @@ -312,9 +309,8 @@ class Scheduler(): # def _resume_jobs(self): if self.suspended: - for queue in self.queues: - for job in queue.active_jobs: - job.resume() + for job in self.active_jobs: + job.resume() self.suspended = False self._starttime += (datetime.datetime.now() - self._suspendtime) self._suspendtime = None @@ -377,17 +373,15 @@ class Scheduler(): wait_limit = 20.0 # First tell all jobs to terminate - for queue in self.queues: - for job in queue.active_jobs: - job.terminate() + for job in self.active_jobs: + job.terminate() # Now wait for them to really terminate - for queue in self.queues: - for job in queue.active_jobs: - elapsed = datetime.datetime.now() - wait_start - timeout = max(wait_limit - elapsed.total_seconds(), 0.0) - if not job.terminate_wait(timeout): - job.kill() + for job in self.active_jobs: + elapsed = datetime.datetime.now() - wait_start + timeout = max(wait_limit - elapsed.total_seconds(), 0.0) + if not job.terminate_wait(timeout): + job.kill() self.loop.stop() |