summaryrefslogtreecommitdiff
path: root/buildstream
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream')
-rw-r--r--buildstream/_scheduler/queue.py56
1 files changed, 46 insertions, 10 deletions
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py
index 9f97e7b0c..f3e0bd7f3 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queue.py
@@ -22,10 +22,15 @@
# System imports
from collections import deque
from enum import Enum
+import traceback
# Local imports
from .job import Job
+# BuildStream toplevel imports
+from .._exceptions import BstError, _set_last_task_error
+from .._message import Message, MessageType
+
# Indicates the kind of activity
#
@@ -209,19 +214,43 @@ class Queue():
# Give the result of the job to the Queue implementor,
# and determine if it should be considered as processed
# or skipped.
- if self.done(element, job.result, returncode):
- skip = False
+ try:
+ processed = self.done(element, job.result, returncode)
+
+ except BstError as e:
+
+ # Report error and mark as failed
+ #
+ self.message(element, MessageType.ERROR, "Post processing error", detail=str(e))
+ self.failed_elements.append(element)
+
+ # Treat this as a task error as it's related to a task
+ # even though it did not occur in the task context
+ #
+ # This just allows us stronger testing capability
+ #
+ _set_last_task_error(e.domain, e.reason)
+
+ except Exception as e: # pylint: disable=broad-except
+
+ # Report unhandled exceptions and mark as failed
+ #
+ self.message(element, MessageType.BUG,
+ "Unhandled exception in post processing",
+ detail=traceback.format_exc())
+ self.failed_elements.append(element)
else:
- skip = True
- if returncode == 0:
- self.done_queue.append(element)
- if skip:
- self.skipped_elements.append(element)
+ # No exception occured, handle the success/failure state in the normal way
+ #
+ if returncode == 0:
+ self.done_queue.append(element)
+ if processed:
+ self.processed_elements.append(element)
+ else:
+ self.skipped_elements.append(element)
else:
- self.processed_elements.append(element)
- else:
- self.failed_elements.append(element)
+ self.failed_elements.append(element)
# Give the token for this job back to the scheduler
# immediately before invoking another round of scheduling
@@ -231,3 +260,10 @@ class Queue():
self.scheduler.job_completed(self, job, returncode == 0)
self.scheduler.sched()
+
+ # Convenience wrapper for Queue implementations to send
+ # a message for the element they are processing
+ def message(self, element, message_type, brief, **kwargs):
+ context = element._get_context()
+ message = Message(element._get_unique_id(), message_type, brief, **kwargs)
+ context._message(message)