diff options
Diffstat (limited to 'src/buildstream/_scheduler/queues/queue.py')
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index adc1cd467..538b2b9d1 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -63,13 +63,6 @@ class Queue(): def __init__(self, scheduler): # - # Public members - # - self.failed_elements = [] # List of failed elements, for the frontend - self.processed_elements_count = 0 # Number of processed elements, for the frontend - self.skipped_elements_count = 0 # Number of skipped elements, for the frontend - - # # Private members # self._scheduler = scheduler @@ -87,6 +80,17 @@ class Queue(): if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: self._max_retries = scheduler.context.sched_network_retries + self._task_group = self._scheduler._state.add_task_group(self.action_name) + + # destroy() + # + # Explicitly performs all cleanup tasks for this queue + # + # Note: Doing this explicitly is preferred to a __del__ method because + # it is not at the mercy of the garbage collector + def destroy(self): + self._scheduler._state.remove_task_group(self.action_name) + ##################################################### # Abstract Methods for Queue implementations # ##################################################### @@ -236,6 +240,12 @@ class Queue(): def set_required_element_check(self): self._required_element_check = True + # any_failed_elements() + # + # Returns whether any elements in this queue have failed their jobs + def any_failed_elements(self): + return any(self._task_group.failed_tasks) + ##################################################### # Private Methods # ##################################################### @@ -298,7 +308,7 @@ class Queue(): # Report error and mark as failed # self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) # Treat this as a task error as it's related to a task # even though it did not occur in the task context @@ -314,18 +324,18 @@ class Queue(): self._message(element, MessageType.BUG, "Unhandled exception in post processing", detail=traceback.format_exc()) - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) else: # All elements get placed on the done queue for later processing. self._done_queue.append(element) # These lists are for bookkeeping purposes for the UI and logging. if status == JobStatus.SKIPPED or job.get_terminated(): - self.skipped_elements_count += 1 + self._task_group.add_skipped_task() elif status == JobStatus.OK: - self.processed_elements_count += 1 + self._task_group.add_processed_task() else: - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) # Convenience wrapper for Queue implementations to send # a message for the element they are processing @@ -357,7 +367,7 @@ class Queue(): status = self.status(element) if status == QueueStatus.SKIP: # Place skipped elements into the done queue immediately - self.skipped_elements_count += 1 # Public record of skipped elements + self._task_group.add_skipped_task() self._done_queue.append(element) # Elements to proceed to the next queue elif status == QueueStatus.READY: # Push elements which are ready to be processed immediately into the queue |