summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/queues/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/queues/queue.py')
-rw-r--r--src/buildstream/_scheduler/queues/queue.py36
1 files changed, 23 insertions, 13 deletions
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index adc1cd467..538b2b9d1 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -63,13 +63,6 @@ class Queue():
def __init__(self, scheduler):
#
- # Public members
- #
- self.failed_elements = [] # List of failed elements, for the frontend
- self.processed_elements_count = 0 # Number of processed elements, for the frontend
- self.skipped_elements_count = 0 # Number of skipped elements, for the frontend
-
- #
# Private members
#
self._scheduler = scheduler
@@ -87,6 +80,17 @@ class Queue():
if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
self._max_retries = scheduler.context.sched_network_retries
+ self._task_group = self._scheduler._state.add_task_group(self.action_name)
+
+ # destroy()
+ #
+ # Explicitly performs all cleanup tasks for this queue
+ #
+ # Note: Doing this explicitly is preferred to a __del__ method because
+ # it is not at the mercy of the garbage collector
+ def destroy(self):
+ self._scheduler._state.remove_task_group(self.action_name)
+
#####################################################
# Abstract Methods for Queue implementations #
#####################################################
@@ -236,6 +240,12 @@ class Queue():
def set_required_element_check(self):
self._required_element_check = True
+ # any_failed_elements()
+ #
+ # Returns whether any elements in this queue have failed their jobs
+ def any_failed_elements(self):
+ return any(self._task_group.failed_tasks)
+
#####################################################
# Private Methods #
#####################################################
@@ -298,7 +308,7 @@ class Queue():
# Report error and mark as failed
#
self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
# Treat this as a task error as it's related to a task
# even though it did not occur in the task context
@@ -314,18 +324,18 @@ class Queue():
self._message(element, MessageType.BUG,
"Unhandled exception in post processing",
detail=traceback.format_exc())
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
else:
# All elements get placed on the done queue for later processing.
self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED or job.get_terminated():
- self.skipped_elements_count += 1
+ self._task_group.add_skipped_task()
elif status == JobStatus.OK:
- self.processed_elements_count += 1
+ self._task_group.add_processed_task()
else:
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
@@ -357,7 +367,7 @@ class Queue():
status = self.status(element)
if status == QueueStatus.SKIP:
# Place skipped elements into the done queue immediately
- self.skipped_elements_count += 1 # Public record of skipped elements
+ self._task_group.add_skipped_task()
self._done_queue.append(element) # Elements to proceed to the next queue
elif status == QueueStatus.READY:
# Push elements which are ready to be processed immediately into the queue