diff options
Diffstat (limited to 'buildstream/_scheduler/queues/queue.py')
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 127 |
1 files changed, 49 insertions, 78 deletions
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index ff38cb512..ec1e81350 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -72,8 +72,9 @@ class Queue(): # Private members # self._scheduler = scheduler - self._wait_queue = deque() - self._done_queue = deque() + self._resources = scheduler.resources # Shared resource pool + self._wait_queue = deque() # Ready / Waiting elements + self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 # Assert the subclass has setup class data @@ -115,16 +116,6 @@ class Queue(): def status(self, element): return QueueStatus.READY - # prepare() - # - # Abstract method for handling job preparation in the main process. - # - # Args: - # element (Element): The element which is scheduled - # - def prepare(self, element): - pass - # done() # # Abstract method for handling a successful job completion. @@ -153,26 +144,18 @@ class Queue(): if not elts: return - # Note: The internal lists work with jobs. This is not - # reflected in any external methods (except - # pop/peek_ready_jobs). - def create_job(element): - logfile = self._element_log_path(element) - return ElementJob(self._scheduler, self.action_name, - logfile, element=element, queue=self, - resources=self.resources, - action_cb=self.process, - complete_cb=self._job_done, - max_retries=self._max_retries) - - # Place skipped elements directly on the done queue - jobs = [create_job(elt) for elt in elts] - skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP] - wait = [job for job in jobs if job not in skip] - - self._wait_queue.extend(wait) - self._done_queue.extend(skip) - self.skipped_elements.extend(skip) + # Place skipped elements on the done queue right away. + # + # The remaining ready and waiting elements must remain in the + # same queue, and ready status must be determined at the moment + # which the scheduler is asking for the next job. + # + skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] + wait = [elt for elt in elts if elt not in skip] + + self.skipped_elements.extend(skip) # Public record of skipped elements + self._done_queue.extend(skip) # Elements to be processed + self._wait_queue.extend(wait) # Elements eligible to be dequeued # dequeue() # @@ -184,69 +167,59 @@ class Queue(): # def dequeue(self): while self._done_queue: - yield self._done_queue.popleft().element + yield self._done_queue.popleft() # dequeue_ready() # - # Reports whether there are any elements to dequeue + # Reports whether any elements can be promoted to other queues # # Returns: - # (bool): Whether there are elements to dequeue + # (bool): Whether there are elements ready # def dequeue_ready(self): return any(self._done_queue) - # pop_ready_jobs() - # - # Returns: - # ([Job]): A list of jobs to run + # harvest_jobs() # # Process elements in the queue, moving elements which were enqueued - # into the dequeue pool, and processing them if necessary. - # - # This will have different results for elements depending - # on the Queue.status() implementation. - # - # o Elements which are QueueStatus.WAIT will not be effected + # into the dequeue pool, and creating as many jobs for which resources + # can be reserved. # - # o Elements which are QueueStatus.SKIP will move directly - # to the dequeue pool - # - # o For Elements which are QueueStatus.READY a Job will be - # created and returned to the caller, given that the scheduler - # allows the Queue enough resources for the given job + # Returns: + # ([Job]): A list of jobs which can be run now # - def pop_ready_jobs(self): + def harvest_jobs(self): unready = [] ready = [] while self._wait_queue: - job = self._wait_queue.popleft() - element = job.element + if not self._resources.reserve(self.resources, peek=True): + break + element = self._wait_queue.popleft() status = self.status(element) + if status == QueueStatus.WAIT: - unready.append(job) - continue + unready.append(element) elif status == QueueStatus.SKIP: - self._done_queue.append(job) + self._done_queue.append(element) self.skipped_elements.append(element) - continue - - self.prepare(element) - ready.append(job) + else: + reserved = self._resources.reserve(self.resources) + assert reserved + ready.append(element) - # These were not ready but were in the beginning, give em - # first priority again next time around self._wait_queue.extendleft(unready) - return ready - - def peek_ready_jobs(self): - def ready(job): - return self.status(job.element) == QueueStatus.READY - - yield from (job for job in self._wait_queue if ready(job)) + return [ + ElementJob(self._scheduler, self.action_name, + self._element_log_path(element), + element=element, queue=self, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) + for element in ready + ] ##################################################### # Private Methods # @@ -292,6 +265,10 @@ class Queue(): # def _job_done(self, job, element, status, result): + # Now release the resources we reserved + # + self._resources.release(self.resources) + # Update values that need to be synchronized in the main task # before calling any queue implementation self._update_workspaces(element, job) @@ -324,14 +301,8 @@ class Queue(): detail=traceback.format_exc()) self.failed_elements.append(element) else: - # - # No exception occured in post processing - # - - # Only place in the output done queue if the job - # was considered successful - if success: - self._done_queue.append(job) + # All elements get placed on the done queue for later processing. + self._done_queue.append(element) # These lists are for bookkeeping purposes for the UI and logging. if status == JobStatus.SKIPPED: |