summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2018-04-26 15:48:29 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2018-05-04 13:45:11 +0100
commit92a6e3464fdb29181556c2f12d0cc6cc99774ff4 (patch)
tree1a441bbf966ae44517d0c95bf8647938aba1b1bb
parent8e59ca0b205b5f84e1e9a88cd169bd4168a53ded (diff)
downloadbuildstream-372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes.tar.gz
buildstream/_scheduler/*.py: Make job submission a queue job372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
-rw-r--r--buildstream/_scheduler/jobs/job.py6
-rw-r--r--buildstream/_scheduler/queues/queue.py23
-rw-r--r--buildstream/_scheduler/scheduler.py92
3 files changed, 58 insertions, 63 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 6728ae0e6..9335cf90e 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -67,13 +67,14 @@ class Process(multiprocessing.Process):
#
class Job():
- def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+ def __init__(self, scheduler, job_type, action_name, logfile, *, max_retries=0):
#
# Public members
#
self.action_name = action_name # The action name for the Queue
- self.child_data = None
+ self.child_data = None # Data to be sent to the main process
+ self.job_type = job_type # The type of the job
#
# Private members
@@ -540,6 +541,7 @@ class Job():
return
self._parent_complete(returncode == 0, self._result)
+ self._scheduler.job_completed(self)
# _parent_process_envelope()
#
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 648bebbff..434ca8745 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -78,7 +78,6 @@ class Queue():
#
# Public members
#
- self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation
self.failed_elements = [] # List of failed elements, for the frontend
self.processed_elements = [] # List of processed elements, for the frontend
self.skipped_elements = [] # List of skipped elements, for the frontend
@@ -224,6 +223,7 @@ class Queue():
def process_ready(self):
scheduler = self._scheduler
unready = []
+ ready = []
while self._wait_queue and scheduler.get_job_token(self.queue_type):
element = self._wait_queue.popleft()
@@ -242,20 +242,24 @@ class Queue():
logfile = self._element_log_path(element)
self.prepare(element)
- job = ElementJob(scheduler, self.action_name,
+ job = ElementJob(scheduler, self.queue_type,
+ self.action_name,
logfile, element=element,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
- scheduler.job_starting(job, element)
+ ready.append(job)
- job.spawn()
- self.active_jobs.append(job)
+ # Notify the frontend
+ if self._scheduler._job_start_callback:
+ self._scheduler._job_start_callback(element, self.action_name)
# These were not ready but were in the beginning, give em
# first priority again next time around
self._wait_queue.extendleft(unready)
+ return ready
+
#####################################################
# Private Methods #
#####################################################
@@ -299,9 +303,6 @@ class Queue():
#
def _job_done(self, job, element, success, result):
- # Remove from our jobs
- self.active_jobs.remove(job)
-
# Update workspaces in the main task before calling any queue implementation
self._update_workspaces(element, job)
@@ -347,13 +348,11 @@ class Queue():
self.failed_elements.append(element)
# Give the token for this job back to the scheduler
- # immediately before invoking another round of scheduling
self._scheduler.put_job_token(self.queue_type)
# Notify frontend
- self._scheduler.job_completed(self, job, element, success)
-
- self._scheduler.sched()
+ if self._scheduler._job_complete_callback:
+ self._scheduler._job_complete_callback(element, self, job.action_name, success)
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 61cfc1190..8187c7a44 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -71,8 +71,10 @@ class Scheduler():
#
# Public members
#
- self.queues = None # Exposed for the frontend to print summaries
+ self.active_jobs = [] # Jobs currently being run in the scheduler
self.context = context # The Context object shared with Queues
+ self.queues = None
+ self.queue_runner = None # The QueueRunner that delivers jobs to schedule
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
self.suspended = False # Whether the scheduler is currently suspended
@@ -83,7 +85,6 @@ class Scheduler():
#
# Private members
#
- self._runners = []
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
self._job_start_callback = job_start_callback
@@ -116,8 +117,8 @@ class Scheduler():
#
def run(self, queues):
+ self.queue_runner = QueueRunner(self, queues)
# Hold on to the queues to process
- self._runners.append(QueueRunner(self, queues))
self.queues = queues
# Ensure that we have a fresh new event loop, in case we want
@@ -224,17 +225,38 @@ class Scheduler():
# and process anything that is ready.
#
def sched(self):
- for runner in self._runners:
- runner.schedule_jobs()
+ jobs = self.queue_runner.schedule_jobs()
+ self.run_jobs(jobs)
# If nothings ticking, time to bail out
- ticking = 0
- for queue in self.queues:
- ticking += len(queue.active_jobs)
-
- if ticking == 0:
+ if not self.active_jobs:
self.loop.stop()
+ # run_jobs():
+ #
+ # Execute jobs and track them.
+ #
+ # Args:
+ # jobs (typing.Iterable[jobs]) - A set of jobs to run
+ #
+ def run_jobs(self, jobs):
+ for job in jobs:
+ job.spawn()
+ self.active_jobs.append(job)
+
+ # job_completed():
+ #
+ # Called when a Job completes
+ #
+ # Args:
+ # queue (Queue): The Queue holding a complete job
+ # job (Job): The completed Job
+ # success (bool): Whether the Job completed with a success status
+ #
+ def job_completed(self, job):
+ self.active_jobs.remove(job)
+ self.sched()
+
# get_job_token():
#
# Used by the Queue object to obtain a token for
@@ -266,30 +288,6 @@ class Scheduler():
def put_job_token(self, queue_type):
self._job_tokens[queue_type] += 1
- # job_starting():
- #
- # Called by the Queue when starting a Job
- #
- # Args:
- # job (Job): The starting Job
- #
- def job_starting(self, job, element):
- if self._job_start_callback:
- self._job_start_callback(element, job.action_name)
-
- # job_completed():
- #
- # Called by the Queue when a Job completes
- #
- # Args:
- # queue (Queue): The Queue holding a complete job
- # job (Job): The completed Job
- # success (bool): Whether the Job completed with a success status
- #
- def job_completed(self, queue, job, element, success):
- if self._job_complete_callback:
- self._job_complete_callback(element, queue, job.action_name, success)
-
#######################################################
# Local Private Methods #
#######################################################
@@ -302,9 +300,8 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- for queue in self.queues:
- for job in queue.active_jobs:
- job.suspend()
+ for job in self.active_jobs:
+ job.suspend()
# _resume_jobs()
#
@@ -312,9 +309,8 @@ class Scheduler():
#
def _resume_jobs(self):
if self.suspended:
- for queue in self.queues:
- for job in queue.active_jobs:
- job.resume()
+ for job in self.active_jobs:
+ job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
self._suspendtime = None
@@ -377,17 +373,15 @@ class Scheduler():
wait_limit = 20.0
# First tell all jobs to terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- job.terminate()
+ for job in self.active_jobs:
+ job.terminate()
# Now wait for them to really terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- elapsed = datetime.datetime.now() - wait_start
- timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
- if not job.terminate_wait(timeout):
- job.kill()
+ for job in self.active_jobs:
+ elapsed = datetime.datetime.now() - wait_start
+ timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+ if not job.terminate_wait(timeout):
+ job.kill()
self.loop.stop()