summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/queues/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_scheduler/queues/queue.py')
-rw-r--r--buildstream/_scheduler/queues/queue.py127
1 files changed, 49 insertions, 78 deletions
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index ff38cb512..ec1e81350 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -72,8 +72,9 @@ class Queue():
# Private members
#
self._scheduler = scheduler
- self._wait_queue = deque()
- self._done_queue = deque()
+ self._resources = scheduler.resources # Shared resource pool
+ self._wait_queue = deque() # Ready / Waiting elements
+ self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
# Assert the subclass has setup class data
@@ -115,16 +116,6 @@ class Queue():
def status(self, element):
return QueueStatus.READY
- # prepare()
- #
- # Abstract method for handling job preparation in the main process.
- #
- # Args:
- # element (Element): The element which is scheduled
- #
- def prepare(self, element):
- pass
-
# done()
#
# Abstract method for handling a successful job completion.
@@ -153,26 +144,18 @@ class Queue():
if not elts:
return
- # Note: The internal lists work with jobs. This is not
- # reflected in any external methods (except
- # pop/peek_ready_jobs).
- def create_job(element):
- logfile = self._element_log_path(element)
- return ElementJob(self._scheduler, self.action_name,
- logfile, element=element, queue=self,
- resources=self.resources,
- action_cb=self.process,
- complete_cb=self._job_done,
- max_retries=self._max_retries)
-
- # Place skipped elements directly on the done queue
- jobs = [create_job(elt) for elt in elts]
- skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
- wait = [job for job in jobs if job not in skip]
-
- self._wait_queue.extend(wait)
- self._done_queue.extend(skip)
- self.skipped_elements.extend(skip)
+ # Place skipped elements on the done queue right away.
+ #
+ # The remaining ready and waiting elements must remain in the
+ # same queue, and ready status must be determined at the moment
+ # which the scheduler is asking for the next job.
+ #
+ skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
+ wait = [elt for elt in elts if elt not in skip]
+
+ self.skipped_elements.extend(skip) # Public record of skipped elements
+ self._done_queue.extend(skip) # Elements to be processed
+ self._wait_queue.extend(wait) # Elements eligible to be dequeued
# dequeue()
#
@@ -184,69 +167,59 @@ class Queue():
#
def dequeue(self):
while self._done_queue:
- yield self._done_queue.popleft().element
+ yield self._done_queue.popleft()
# dequeue_ready()
#
- # Reports whether there are any elements to dequeue
+ # Reports whether any elements can be promoted to other queues
#
# Returns:
- # (bool): Whether there are elements to dequeue
+ # (bool): Whether there are elements ready
#
def dequeue_ready(self):
return any(self._done_queue)
- # pop_ready_jobs()
- #
- # Returns:
- # ([Job]): A list of jobs to run
+ # harvest_jobs()
#
# Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and processing them if necessary.
- #
- # This will have different results for elements depending
- # on the Queue.status() implementation.
- #
- # o Elements which are QueueStatus.WAIT will not be effected
+ # into the dequeue pool, and creating as many jobs for which resources
+ # can be reserved.
#
- # o Elements which are QueueStatus.SKIP will move directly
- # to the dequeue pool
- #
- # o For Elements which are QueueStatus.READY a Job will be
- # created and returned to the caller, given that the scheduler
- # allows the Queue enough resources for the given job
+ # Returns:
+ # ([Job]): A list of jobs which can be run now
#
- def pop_ready_jobs(self):
+ def harvest_jobs(self):
unready = []
ready = []
while self._wait_queue:
- job = self._wait_queue.popleft()
- element = job.element
+ if not self._resources.reserve(self.resources, peek=True):
+ break
+ element = self._wait_queue.popleft()
status = self.status(element)
+
if status == QueueStatus.WAIT:
- unready.append(job)
- continue
+ unready.append(element)
elif status == QueueStatus.SKIP:
- self._done_queue.append(job)
+ self._done_queue.append(element)
self.skipped_elements.append(element)
- continue
-
- self.prepare(element)
- ready.append(job)
+ else:
+ reserved = self._resources.reserve(self.resources)
+ assert reserved
+ ready.append(element)
- # These were not ready but were in the beginning, give em
- # first priority again next time around
self._wait_queue.extendleft(unready)
- return ready
-
- def peek_ready_jobs(self):
- def ready(job):
- return self.status(job.element) == QueueStatus.READY
-
- yield from (job for job in self._wait_queue if ready(job))
+ return [
+ ElementJob(self._scheduler, self.action_name,
+ self._element_log_path(element),
+ element=element, queue=self,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
+ for element in ready
+ ]
#####################################################
# Private Methods #
@@ -292,6 +265,10 @@ class Queue():
#
def _job_done(self, job, element, status, result):
+ # Now release the resources we reserved
+ #
+ self._resources.release(self.resources)
+
# Update values that need to be synchronized in the main task
# before calling any queue implementation
self._update_workspaces(element, job)
@@ -324,14 +301,8 @@ class Queue():
detail=traceback.format_exc())
self.failed_elements.append(element)
else:
- #
- # No exception occured in post processing
- #
-
- # Only place in the output done queue if the job
- # was considered successful
- if success:
- self._done_queue.append(job)
+ # All elements get placed on the done queue for later processing.
+ self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED: