summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2019-01-14 14:22:37 -0500
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2019-01-16 14:28:14 -0500
commit6991d6d6f8519cb8a6188238d1735dca8536da81 (patch)
treed5bd7bc70ebf79e36567d503ae4225961382d459
parente385660cea331d688b64ae2d1a65e58718774643 (diff)
downloadbuildstream-6991d6d6f8519cb8a6188238d1735dca8536da81.tar.gz
_scheduler: Refactor of queues and resources.
This branch makes the following changes: * jobs/job.py: No longer stores any interested resource list Jobs are ephemeral again, they only ever exist while they are running. * queues/queue.py: Revert to only handling lists of elements Elements pass through the queues, Queue.harvest_jobs() replaces Queue.pop_ready_jobs() and now the Queue stops creating jobs as soon as there are not enough resources for the job. Also removed unused `prepare()` abstract method. * queues/buildqueue.py: Adapt the part where we launch a job This part needs to be reworked anyway, just touch it up for now so that it doesnt break with the surrounding changes. * jobs/{cachesize,cleanup}job.py: Expose uniform complete callback Allows the scheduler to manage resource deallocation for these two job completions as a custom thing, at the same phase that the Queues take care of their own resource deallocation. * resources.py: No longer has knowledge of the job Since jobs are ephemeral, they are not a suitable place to store the resource identifiers, these must be provided by the callers wherever needed. Now the main Resources object is owned by the Scheduler but shared with Queues, each take care of managing the resources of the jobs they create through the same resource API. * scheduler.py: Reverted to only creating jobs on demand This changes the flow of the scheduler such that whenever jobs complete, the queues are interrogated for as many jobs which can run at the moment but not more; and this completely removes the waiting list. For the internal cache management jobs, we handle this with a little state instead of having a waiting list and only launch when the resources permit it. By abolishing the scheduler waiting list and creating jobs on demand, we fix the order of element processing and consequently fix issue #712.
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py4
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py6
-rw-r--r--buildstream/_scheduler/jobs/job.py25
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py3
-rw-r--r--buildstream/_scheduler/queues/queue.py125
-rw-r--r--buildstream/_scheduler/resources.py113
-rw-r--r--buildstream/_scheduler/scheduler.py250
7 files changed, 287 insertions, 239 deletions
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index 6e4698af9..a96b92353 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -34,8 +34,8 @@ class CacheSizeJob(Job):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
- if self._complete_cb:
- self._complete_cb(result)
+ if self._complete_cb:
+ self._complete_cb(status, result)
def child_process_data(self):
return {}
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index e579e9718..b378b3dab 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -20,8 +20,9 @@ from .job import Job, JobStatus
class CleanupJob(Job):
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
+ self._complete_cb = complete_cb
context = self._scheduler.context
self._artifacts = context.artifactcache
@@ -32,3 +33,6 @@ class CleanupJob(Job):
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
+
+ if self._complete_cb:
+ self._complete_cb(status, result)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 837469a39..91ed187b0 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -85,28 +85,11 @@ class Process(multiprocessing.Process):
# action_name (str): The queue action name
# logfile (str): A template string that points to the logfile
# that should be used - should contain {pid}.
-# resources (iter(ResourceType)) - A set of resources this job
-# wants to use.
-# exclusive_resources (iter(ResourceType)) - A set of resources
-# this job wants to use
-# exclusively.
# max_retries (int): The maximum number of retries
#
class Job():
- def __init__(self, scheduler, action_name, logfile, *,
- resources=None, exclusive_resources=None, max_retries=0):
-
- if resources is None:
- resources = set()
- else:
- resources = set(resources)
- if exclusive_resources is None:
- exclusive_resources = set()
- else:
- exclusive_resources = set(resources)
-
- assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
+ def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
#
# Public members
@@ -114,12 +97,6 @@ class Job():
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
- # The resources this job wants to access
- self.resources = resources
- # Resources this job needs to access exclusively, i.e., no
- # other job should be allowed to access them
- self.exclusive_resources = exclusive_resources
-
#
# Private members
#
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 9d8e7182b..66ec4c69b 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -57,11 +57,10 @@ class BuildQueue(Queue):
logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
- resources=self.resources,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
- self._done_queue.append(job)
+ self._done_queue.append(element)
self.failed_elements.append(element)
self._scheduler._job_complete_callback(job, False)
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 84416c8d2..a20f44eea 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -72,8 +72,9 @@ class Queue():
# Private members
#
self._scheduler = scheduler
- self._wait_queue = deque()
- self._done_queue = deque()
+ self._resources = scheduler.resources # Shared resource pool
+ self._wait_queue = deque() # Ready / Waiting elements
+ self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
# Assert the subclass has setup class data
@@ -115,16 +116,6 @@ class Queue():
def status(self, element):
return QueueStatus.READY
- # prepare()
- #
- # Abstract method for handling job preparation in the main process.
- #
- # Args:
- # element (Element): The element which is scheduled
- #
- def prepare(self, element):
- pass
-
# done()
#
# Abstract method for handling a successful job completion.
@@ -153,26 +144,18 @@ class Queue():
if not elts:
return
- # Note: The internal lists work with jobs. This is not
- # reflected in any external methods (except
- # pop/peek_ready_jobs).
- def create_job(element):
- logfile = self._element_log_path(element)
- return ElementJob(self._scheduler, self.action_name,
- logfile, element=element, queue=self,
- resources=self.resources,
- action_cb=self.process,
- complete_cb=self._job_done,
- max_retries=self._max_retries)
-
- # Place skipped elements directly on the done queue
- jobs = [create_job(elt) for elt in elts]
- skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
- wait = [job for job in jobs if job not in skip]
-
- self.skipped_elements.extend([job.element for job in skip])
- self._wait_queue.extend(wait)
- self._done_queue.extend(skip)
+ # Place skipped elements on the done queue right away.
+ #
+ # The remaining ready and waiting elements must remain in the
+ # same queue, and ready status must be determined at the moment
+ # which the scheduler is asking for the next job.
+ #
+ skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
+ wait = [elt for elt in elts if elt not in skip]
+
+ self.skipped_elements.extend(skip) # Public record of skipped elements
+ self._done_queue.extend(skip) # Elements to be processed
+ self._wait_queue.extend(wait) # Elements eligible to be dequeued
# dequeue()
#
@@ -184,69 +167,59 @@ class Queue():
#
def dequeue(self):
while self._done_queue:
- yield self._done_queue.popleft().element
+ yield self._done_queue.popleft()
# dequeue_ready()
#
- # Reports whether there are any elements to dequeue
+ # Reports whether any elements can be promoted to other queues
#
# Returns:
- # (bool): Whether there are elements to dequeue
+ # (bool): Whether there are elements ready
#
def dequeue_ready(self):
return any(self._done_queue)
- # pop_ready_jobs()
- #
- # Returns:
- # ([Job]): A list of jobs to run
+ # harvest_jobs()
#
# Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and processing them if necessary.
- #
- # This will have different results for elements depending
- # on the Queue.status() implementation.
- #
- # o Elements which are QueueStatus.WAIT will not be affected
+ # into the dequeue pool, and creating as many jobs for which resources
+ # can be reserved.
#
- # o Elements which are QueueStatus.SKIP will move directly
- # to the dequeue pool
- #
- # o For Elements which are QueueStatus.READY a Job will be
- # created and returned to the caller, given that the scheduler
- # allows the Queue enough resources for the given job
+ # Returns:
+ # ([Job]): A list of jobs which can be run now
#
- def pop_ready_jobs(self):
+ def harvest_jobs(self):
unready = []
ready = []
while self._wait_queue:
- job = self._wait_queue.popleft()
- element = job.element
+ if not self._resources.reserve(self.resources, peek=True):
+ break
+ element = self._wait_queue.popleft()
status = self.status(element)
+
if status == QueueStatus.WAIT:
- unready.append(job)
- continue
+ unready.append(element)
elif status == QueueStatus.SKIP:
- self._done_queue.append(job)
+ self._done_queue.append(element)
self.skipped_elements.append(element)
- continue
-
- self.prepare(element)
- ready.append(job)
+ else:
+ reserved = self._resources.reserve(self.resources)
+ assert reserved
+ ready.append(element)
- # These were not ready but were in the beginning, give em
- # first priority again next time around
self._wait_queue.extendleft(unready)
- return ready
-
- def peek_ready_jobs(self):
- def ready(job):
- return self.status(job.element) == QueueStatus.READY
-
- yield from (job for job in self._wait_queue if ready(job))
+ return [
+ ElementJob(self._scheduler, self.action_name,
+ self._element_log_path(element),
+ element=element, queue=self,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
+ for element in ready
+ ]
#####################################################
# Private Methods #
@@ -292,6 +265,10 @@ class Queue():
#
def _job_done(self, job, element, status, result):
+ # Now release the resources we reserved
+ #
+ self._resources.release(self.resources)
+
# Update values that need to be synchronized in the main task
# before calling any queue implementation
self._update_workspaces(element, job)
@@ -324,12 +301,8 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
- #
- # No exception occured in post processing
- #
-
- # All jobs get placed on the done queue for later processing.
- self._done_queue.append(job)
+ # All elements get placed on the done queue for later processing.
+ self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED:
diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py
index fcf10d7bd..f19d66b44 100644
--- a/buildstream/_scheduler/resources.py
+++ b/buildstream/_scheduler/resources.py
@@ -34,28 +34,25 @@ class Resources():
ResourceType.UPLOAD: set()
}
- def clear_job_resources(self, job):
- for resource in job.exclusive_resources:
- self._exclusive_resources[resource].remove(hash(job))
+ # reserve()
+ #
+ # Reserves a set of resources
+ #
+ # Args:
+ # resources (set): A set of ResourceTypes
+ # exclusive (set): Another set of ResourceTypes
+ # peek (bool): Whether to only peek at whether the resource is available
+ #
+ # Returns:
+ # (bool): True if the resources could be reserved
+ #
+ def reserve(self, resources, exclusive=None, *, peek=False):
+ if exclusive is None:
+ exclusive = set()
- for resource in job.resources:
- self._used_resources[resource] -= 1
-
- def reserve_exclusive_resources(self, job):
- exclusive = job.exclusive_resources
-
- # The very first thing we do is to register any exclusive
- # resources this job may want. Even if the job is not yet
- # allowed to run (because another job is holding the resource
- # it wants), we can still set this - it just means that any
- # job *currently* using these resources has to finish first,
- # and no new jobs wanting these can be launched (except other
- # exclusive-access jobs).
- #
- for resource in exclusive:
- self._exclusive_resources[resource].add(hash(job))
+ resources = set(resources)
+ exclusive = set(exclusive)
- def reserve_job_resources(self, job):
# First, we check if the job wants to access a resource that
# another job wants exclusive access to. If so, it cannot be
# scheduled.
@@ -68,7 +65,8 @@ class Resources():
# is currently not possible, but may be worth thinking
# about.
#
- for resource in job.resources - job.exclusive_resources:
+ for resource in resources - exclusive:
+
# If our job wants this resource exclusively, we never
# check this, so we can get away with not (temporarily)
# removing it from the set.
@@ -84,14 +82,14 @@ class Resources():
# at a time, despite being allowed to be part of the exclusive
# set.
#
- for exclusive in job.exclusive_resources:
- if self._used_resources[exclusive] != 0:
+ for resource in exclusive:
+ if self._used_resources[resource] != 0:
return False
# Finally, we check if we have enough of each resource
# available. If we don't have enough, the job cannot be
# scheduled.
- for resource in job.resources:
+ for resource in resources:
if (self._max_resources[resource] > 0 and
self._used_resources[resource] >= self._max_resources[resource]):
return False
@@ -99,7 +97,70 @@ class Resources():
# Now we register the fact that our job is using the resources
# it asked for, and tell the scheduler that it is allowed to
# continue.
- for resource in job.resources:
- self._used_resources[resource] += 1
+ if not peek:
+ for resource in resources:
+ self._used_resources[resource] += 1
return True
+
+ # release()
+ #
+ # Release resources previously reserved with Resources.reserve()
+ #
+ # Args:
+ # resources (set): A set of resources to release
+ #
+ def release(self, resources):
+ for resource in resources:
+ assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
+ self._used_resources[resource] -= 1
+
+ # register_exclusive_interest()
+ #
+ # Inform the resources pool that `source` has an interest in
+ # reserving this resource exclusively.
+ #
+ # The source parameter is used to identify the caller, it
+ # must be ensured to be unique for the time that the
+ # interest is registered.
+ #
+ # This function may be called multiple times, and subsequent
+ # calls will simply have no effect until clear_exclusive_interest()
+ # is used to clear the interest.
+ #
+ # This must be called in advance of reserve()
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (any): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def register_exclusive_interest(self, resources, source):
+
+ # The very first thing we do is to register any exclusive
+ # resources this job may want. Even if the job is not yet
+ # allowed to run (because another job is holding the resource
+ # it wants), we can still set this - it just means that any
+ # job *currently* using these resources has to finish first,
+ # and no new jobs wanting these can be launched (except other
+ # exclusive-access jobs).
+ #
+ for resource in resources:
+ self._exclusive_resources[resource].add(source)
+
+ # unregister_exclusive_interest()
+ #
+ # Clear the exclusive interest in these resources.
+ #
+ # This should be called by the given source which registered
+ # an exclusive interest.
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (str): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def unregister_exclusive_interest(self, resources, source):
+
+ for resource in resources:
+ self._exclusive_resources[resource].remove(source)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 97335de34..9b688d1dd 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import CacheSizeJob, CleanupJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob
# A decent return code for Scheduler.run()
@@ -38,14 +38,10 @@ class SchedStatus():
TERMINATED = 1
-# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
-# which we launch dynamically, they have the property of being
-# meaningless to queue if one is already queued, and it also
-# doesnt make sense to run them in parallel
+# Some action names for the internal jobs we launch
#
_ACTION_NAME_CLEANUP = 'cleanup'
_ACTION_NAME_CACHE_SIZE = 'cache_size'
-_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
# Scheduler()
@@ -81,8 +77,6 @@ class Scheduler():
#
# Public members
#
- self.active_jobs = [] # Jobs currently being run in the scheduler
- self.waiting_jobs = [] # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -95,15 +89,23 @@ class Scheduler():
#
# Private members
#
+ self._active_jobs = [] # Jobs currently being run in the scheduler
+ self._starttime = start_time # Initial application start time
+ self._suspendtime = None # Session time compensation for suspended state
+ self._queue_jobs = True # Whether we should continue to queue jobs
+
+ # State of cache management related jobs
+ self._cache_size_scheduled = False # Whether we have a cache size job scheduled
+ self._cache_size_running = None # A running CacheSizeJob, or None
+ self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
+ self._cleanup_running = None # A running CleanupJob, or None
+
+ # Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
self._job_start_callback = job_start_callback
self._job_complete_callback = job_complete_callback
- self._starttime = start_time
- self._suspendtime = None
- self._queue_jobs = True # Whether we should continue to queue jobs
-
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
#
@@ -113,9 +115,9 @@ class Scheduler():
self._exclusive_waiting = set()
self._exclusive_active = set()
- self._resources = Resources(context.sched_builders,
- context.sched_fetchers,
- context.sched_pushers)
+ self.resources = Resources(context.sched_builders,
+ context.sched_fetchers,
+ context.sched_pushers)
# run()
#
@@ -150,7 +152,7 @@ class Scheduler():
self._connect_signals()
# Run the queues
- self._schedule_queue_jobs()
+ self._sched()
self.loop.run_forever()
self.loop.close()
@@ -240,12 +242,14 @@ class Scheduler():
# status (JobStatus): The status of the completed job
#
def job_completed(self, job, status):
- self._resources.clear_job_resources(job)
- self.active_jobs.remove(job)
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
- self._exclusive_active.remove(job.action_name)
+
+ # Remove from the active jobs list
+ self._active_jobs.remove(job)
+
+ # Scheduler owner facing callback
self._job_complete_callback(job, status)
- self._schedule_queue_jobs()
+
+ # Now check for more jobs
self._sched()
# check_cache_size():
@@ -255,78 +259,104 @@ class Scheduler():
# if needed.
#
def check_cache_size(self):
- job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
- 'cache_size/cache_size',
- resources=[ResourceType.CACHE,
- ResourceType.PROCESS],
- complete_cb=self._run_cleanup)
- self._schedule_jobs([job])
+
+ # Here we assume we are called in response to a job
+ # completion callback, or before entering the scheduler.
+ #
+ # As such there is no need to call `_sched()` from here,
+ # and we prefer to run it once at the last moment.
+ #
+ self._cache_size_scheduled = True
#######################################################
# Local Private Methods #
#######################################################
- # _sched()
+ # _spawn_job()
#
- # The main driving function of the scheduler, it will be called
- # automatically when Scheduler.run() is called initially,
+ # Spanws a job
#
- def _sched(self):
- for job in self.waiting_jobs:
- self._resources.reserve_exclusive_resources(job)
+ # Args:
+ # job (Job): The job to spawn
+ #
+ def _spawn_job(self, job):
+ job.spawn()
+ self._active_jobs.append(job)
+ if self._job_start_callback:
+ self._job_start_callback(job)
- for job in self.waiting_jobs:
- if not self._resources.reserve_job_resources(job):
- continue
+ # Callback for the cache size job
+ def _cache_size_job_complete(self, status, cache_size):
- # Postpone these jobs if one is already running
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
- job.action_name in self._exclusive_active:
- continue
+ # Deallocate cache size job resources
+ self._cache_size_running = None
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
- job.spawn()
- self.waiting_jobs.remove(job)
- self.active_jobs.append(job)
+ # Schedule a cleanup job if we've hit the threshold
+ if status != JobStatus.OK:
+ return
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
- self._exclusive_waiting.remove(job.action_name)
- self._exclusive_active.add(job.action_name)
+ context = self.context
+ artifacts = context.artifactcache
- if self._job_start_callback:
- self._job_start_callback(job)
+ if artifacts.has_quota_exceeded():
+ self._cleanup_scheduled = True
- # If nothings ticking, time to bail out
- if not self.active_jobs and not self.waiting_jobs:
- self.loop.stop()
+ # Callback for the cleanup job
+ def _cleanup_job_complete(self, status, cache_size):
- # _schedule_jobs()
- #
- # The main entry point for jobs to be scheduled.
- #
- # This is called either as a result of scanning the queues
- # in _schedule_queue_jobs(), or directly by the Scheduler
- # to insert special jobs like cleanups.
+ # Deallocate cleanup job resources
+ self._cleanup_running = None
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
+
+ # Unregister the exclusive interest when we're done with it
+ if not self._cleanup_scheduled:
+ self.resources.unregister_exclusive_interest(
+ [ResourceType.CACHE], 'cache-cleanup'
+ )
+
+ # _sched_cleanup_job()
#
- # Args:
- # jobs ([Job]): A list of jobs to schedule
+ # Runs a cleanup job if one is scheduled to run now and
+ # sufficient recources are available.
#
- def _schedule_jobs(self, jobs):
- for job in jobs:
+ def _sched_cleanup_job(self):
- # Special treatment of our redundant exclusive jobs
- #
- if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
+ if self._cleanup_scheduled and self._cleanup_running is None:
+
+ # Ensure we have an exclusive interest in the resources
+ self.resources.register_exclusive_interest(
+ [ResourceType.CACHE], 'cache-cleanup'
+ )
+
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
+ [ResourceType.CACHE]):
- # Drop the job if one is already queued
- if job.action_name in self._exclusive_waiting:
- continue
+ # Update state and launch
+ self._cleanup_scheduled = False
+ self._cleanup_running = \
+ CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
+ complete_cb=self._cleanup_job_complete)
+ self._spawn_job(self._cleanup_running)
- # Mark this action type as queued
- self._exclusive_waiting.add(job.action_name)
+ # _sched_cache_size_job()
+ #
+ # Runs a cache size job if one is scheduled to run now and
+ # sufficient recources are available.
+ #
+ def _sched_cache_size_job(self):
+
+ if self._cache_size_scheduled and not self._cache_size_running:
- self.waiting_jobs.append(job)
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
+ self._cache_size_scheduled = False
+ self._cache_size_running = \
+ CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
+ 'cache_size/cache_size',
+ complete_cb=self._cache_size_job_complete)
+ self._spawn_job(self._cache_size_running)
- # _schedule_queue_jobs()
+ # _sched_queue_jobs()
#
# Ask the queues what jobs they want to schedule and schedule
# them. This is done here so we can ask for new jobs when jobs
@@ -335,7 +365,7 @@ class Scheduler():
# This will process the Queues, pull elements through the Queues
# and process anything that is ready.
#
- def _schedule_queue_jobs(self):
+ def _sched_queue_jobs(self):
ready = []
process_queues = True
@@ -344,10 +374,7 @@ class Scheduler():
# Pull elements forward through queues
elements = []
for queue in self.queues:
- # Enqueue elements complete from the last queue
queue.enqueue(elements)
-
- # Dequeue processed elements for the next queue
elements = list(queue.dequeue())
# Kickoff whatever processes can be processed at this time
@@ -362,41 +389,51 @@ class Scheduler():
# thus need all the pulls to complete before ever starting
# a build
ready.extend(chain.from_iterable(
- queue.pop_ready_jobs() for queue in reversed(self.queues)
+ q.harvest_jobs() for q in reversed(self.queues)
))
- # pop_ready_jobs() may have skipped jobs, adding them to
- # the done_queue. Pull these skipped elements forward to
- # the next queue and process them.
+ # harvest_jobs() may have decided to skip some jobs, making
+ # them eligible for promotion to the next queue as a side effect.
+ #
+ # If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
- self._schedule_jobs(ready)
- self._sched()
+ # Spawn the jobs
+ #
+ for job in ready:
+ self._spawn_job(job)
- # _run_cleanup()
- #
- # Schedules the cache cleanup job if the passed size
- # exceeds the cache quota.
+ # _sched()
#
- # Args:
- # cache_size (int): The calculated cache size (ignored)
+ # Run any jobs which are ready to run, or quit the main loop
+ # when nothing is running or is ready to run.
#
- # NOTE: This runs in response to completion of the cache size
- # calculation job lauched by Scheduler.check_cache_size(),
- # which will report the calculated cache size.
+ # This is the main driving function of the scheduler, it is called
+ # initially when we enter Scheduler.run(), and at the end of whenever
+ # any job completes, after any bussiness logic has occurred and before
+ # going back to sleep.
#
- def _run_cleanup(self, cache_size):
- context = self.context
- artifacts = context.artifactcache
+ def _sched(self):
- if not artifacts.has_quota_exceeded():
- return
+ if not self.terminated:
+
+ #
+ # Try the cache management jobs
+ #
+ self._sched_cleanup_job()
+ self._sched_cache_size_job()
+
+ #
+ # Run as many jobs as the queues can handle for the
+ # available resources
+ #
+ self._sched_queue_jobs()
- job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
- resources=[ResourceType.CACHE,
- ResourceType.PROCESS],
- exclusive_resources=[ResourceType.CACHE])
- self._schedule_jobs([job])
+ #
+ # If nothing is ticking then bail out
+ #
+ if not self._active_jobs:
+ self.loop.stop()
# _suspend_jobs()
#
@@ -406,7 +443,7 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- for job in self.active_jobs:
+ for job in self._active_jobs:
job.suspend()
# _resume_jobs()
@@ -415,7 +452,7 @@ class Scheduler():
#
def _resume_jobs(self):
if self.suspended:
- for job in self.active_jobs:
+ for job in self._active_jobs:
job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
@@ -488,19 +525,16 @@ class Scheduler():
wait_limit = 20.0
# First tell all jobs to terminate
- for job in self.active_jobs:
+ for job in self._active_jobs:
job.terminate()
# Now wait for them to really terminate
- for job in self.active_jobs:
+ for job in self._active_jobs:
elapsed = datetime.datetime.now() - wait_start
timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
if not job.terminate_wait(timeout):
job.kill()
- # Clear out the waiting jobs
- self.waiting_jobs = []
-
# Regular timeout for driving status in the UI
def _tick(self):
elapsed = self.elapsed_time()