summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/scheduler.py
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2018-07-16 15:31:55 +0100
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-07-18 14:45:59 +0900
commit249256346613dea52ad32d945dbd303f54bd245e (patch)
tree2a977a806bdc0cd813b50248fb3717d7f841ba88 /buildstream/_scheduler/scheduler.py
parent4f9da15df4691fe582c0d05ccc61a7927dee55ca (diff)
downloadbuildstream-249256346613dea52ad32d945dbd303f54bd245e.tar.gz
Make Jobs abstract and element-independent
Diffstat (limited to 'buildstream/_scheduler/scheduler.py')
-rw-r--r--buildstream/_scheduler/scheduler.py213
1 files changed, 102 insertions, 111 deletions
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 7bfbc958e..bc182db32 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -21,12 +21,13 @@
# System imports
import os
import asyncio
+from itertools import chain
import signal
import datetime
from contextlib import contextmanager
# Local imports
-from .queues import QueueType
+from .resources import Resources
# A decent return code for Scheduler.run()
@@ -69,6 +70,8 @@ class Scheduler():
#
# Public members
#
+ self.active_jobs = [] # Jobs currently being run in the scheduler
+ self.waiting_jobs = [] # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -90,13 +93,9 @@ class Scheduler():
self._suspendtime = None
self._queue_jobs = True # Whether we should continue to queue jobs
- # Initialize task tokens with the number allowed by
- # the user configuration
- self._job_tokens = {
- QueueType.FETCH: context.sched_fetchers,
- QueueType.BUILD: context.sched_builders,
- QueueType.PUSH: context.sched_pushers
- }
+ self._resources = Resources(context.sched_builders,
+ context.sched_fetchers,
+ context.sched_pushers)
# run()
#
@@ -129,7 +128,7 @@ class Scheduler():
self._connect_signals()
# Run the queues
- self.sched()
+ self._schedule_queue_jobs()
self.loop.run_forever()
self.loop.close()
@@ -209,18 +208,74 @@ class Scheduler():
starttime = timenow
return timenow - starttime
- # sched()
+ # schedule_jobs()
+ #
+ # Args:
+ # jobs ([Job]): A list of jobs to schedule
+ #
+ # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
+ # run as soon any other queueing jobs finish, provided sufficient
+ # resources are available for them to run
+ #
+ def schedule_jobs(self, jobs):
+ for job in jobs:
+ self.waiting_jobs.append(job)
+
+ # job_completed():
+ #
+ # Called when a Job completes
+ #
+ # Args:
+ # queue (Queue): The Queue holding a complete job
+ # job (Job): The completed Job
+ # success (bool): Whether the Job completed with a success status
+ #
+ def job_completed(self, job, success):
+ self._resources.clear_job_resources(job)
+ self.active_jobs.remove(job)
+ self._job_complete_callback(job, success)
+ self._schedule_queue_jobs()
+ self._sched()
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
+ # _sched()
#
# The main driving function of the scheduler, it will be called
- # automatically when Scheduler.run() is called initially, and needs
- # to be called whenever a job can potentially be scheduled, usually
- # when a Queue completes handling of a job.
+ # automatically when Scheduler.run() is called initially,
+ #
+ def _sched(self):
+ for job in self.waiting_jobs:
+ self._resources.reserve_exclusive_resources(job)
+
+ for job in self.waiting_jobs:
+ if not self._resources.reserve_job_resources(job):
+ continue
+
+ job.spawn()
+ self.waiting_jobs.remove(job)
+ self.active_jobs.append(job)
+
+ if self._job_start_callback:
+ self._job_start_callback(job)
+
+ # If nothings ticking, time to bail out
+ if not self.active_jobs and not self.waiting_jobs:
+ self.loop.stop()
+
+ # _schedule_queue_jobs()
#
- # This will process the Queues and pull elements through the Queues
+ # Ask the queues what jobs they want to schedule and schedule
+ # them. This is done here so we can ask for new jobs when jobs
+ # from previous queues become available.
+ #
+ # This will process the Queues, pull elements through the Queues
# and process anything that is ready.
#
- def sched(self):
-
+ def _schedule_queue_jobs(self):
+ ready = []
process_queues = True
while self._queue_jobs and process_queues:
@@ -233,90 +288,29 @@ class Scheduler():
# Dequeue processed elements for the next queue
elements = list(queue.dequeue())
- elements = list(elements)
# Kickoff whatever processes can be processed at this time
#
- # We start by queuing from the last queue first, because we want to
- # give priority to queues later in the scheduling process in the case
- # that multiple queues share the same token type.
+ # We start by queuing from the last queue first, because
+ # we want to give priority to queues later in the
+ # scheduling process in the case that multiple queues
+ # share the same token type.
#
- # This avoids starvation situations where we dont move on to fetch
- # tasks for elements which failed to pull, and thus need all the pulls
- # to complete before ever starting a build
- for queue in reversed(self.queues):
- queue.process_ready()
-
- # process_ready() may have skipped jobs, adding them to the done_queue.
- # Pull these skipped elements forward to the next queue and process them.
+ # This avoids starvation situations where we dont move on
+ # to fetch tasks for elements which failed to pull, and
+ # thus need all the pulls to complete before ever starting
+ # a build
+ ready.extend(chain.from_iterable(
+ queue.pop_ready_jobs() for queue in reversed(self.queues)
+ ))
+
+ # pop_ready_jobs() may have skipped jobs, adding them to
+ # the done_queue. Pull these skipped elements forward to
+ # the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
- # If nothings ticking, time to bail out
- ticking = 0
- for queue in self.queues:
- ticking += len(queue.active_jobs)
-
- if ticking == 0:
- self.loop.stop()
-
- # get_job_token():
- #
- # Used by the Queue object to obtain a token for
- # processing a Job, if a Queue does not receive a token
- # then it must wait until a later time in order to
- # process pending jobs.
- #
- # Args:
- # queue_type (QueueType): The type of token to obtain
- #
- # Returns:
- # (bool): Whether a token was handed out or not
- #
- def get_job_token(self, queue_type):
- if self._job_tokens[queue_type] > 0:
- self._job_tokens[queue_type] -= 1
- return True
- return False
-
- # put_job_token():
- #
- # Return a job token to the scheduler. Tokens previously
- # received with get_job_token() must be returned to
- # the scheduler once the associated job is complete.
- #
- # Args:
- # queue_type (QueueType): The type of token to obtain
- #
- def put_job_token(self, queue_type):
- self._job_tokens[queue_type] += 1
-
- # job_starting():
- #
- # Called by the Queue when starting a Job
- #
- # Args:
- # job (Job): The starting Job
- #
- def job_starting(self, job):
- if self._job_start_callback:
- self._job_start_callback(job.element, job.action_name)
-
- # job_completed():
- #
- # Called by the Queue when a Job completes
- #
- # Args:
- # queue (Queue): The Queue holding a complete job
- # job (Job): The completed Job
- # success (bool): Whether the Job completed with a success status
- #
- def job_completed(self, queue, job, success):
- if self._job_complete_callback:
- self._job_complete_callback(job.element, queue, job.action_name, success)
-
- #######################################################
- # Local Private Methods #
- #######################################################
+ self.schedule_jobs(ready)
+ self._sched()
# _suspend_jobs()
#
@@ -326,9 +320,8 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- for queue in self.queues:
- for job in queue.active_jobs:
- job.suspend()
+ for job in self.active_jobs:
+ job.suspend()
# _resume_jobs()
#
@@ -336,9 +329,8 @@ class Scheduler():
#
def _resume_jobs(self):
if self.suspended:
- for queue in self.queues:
- for job in queue.active_jobs:
- job.resume()
+ for job in self.active_jobs:
+ job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
self._suspendtime = None
@@ -401,19 +393,18 @@ class Scheduler():
wait_limit = 20.0
# First tell all jobs to terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- job.terminate()
+ for job in self.active_jobs:
+ job.terminate()
# Now wait for them to really terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- elapsed = datetime.datetime.now() - wait_start
- timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
- if not job.terminate_wait(timeout):
- job.kill()
-
- self.loop.stop()
+ for job in self.active_jobs:
+ elapsed = datetime.datetime.now() - wait_start
+ timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+ if not job.terminate_wait(timeout):
+ job.kill()
+
+ # Clear out the waiting jobs
+ self.waiting_jobs = []
# Regular timeout for driving status in the UI
def _tick(self):