diff options
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_scheduler/queue.py | 56 |
1 files changed, 46 insertions, 10 deletions
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py index 9f97e7b0c..f3e0bd7f3 100644 --- a/buildstream/_scheduler/queue.py +++ b/buildstream/_scheduler/queue.py @@ -22,10 +22,15 @@ # System imports from collections import deque from enum import Enum +import traceback # Local imports from .job import Job +# BuildStream toplevel imports +from .._exceptions import BstError, _set_last_task_error +from .._message import Message, MessageType + # Indicates the kind of activity # @@ -209,19 +214,43 @@ class Queue(): # Give the result of the job to the Queue implementor, # and determine if it should be considered as processed # or skipped. - if self.done(element, job.result, returncode): - skip = False + try: + processed = self.done(element, job.result, returncode) + + except BstError as e: + + # Report error and mark as failed + # + self.message(element, MessageType.ERROR, "Post processing error", detail=str(e)) + self.failed_elements.append(element) + + # Treat this as a task error as it's related to a task + # even though it did not occur in the task context + # + # This just allows us stronger testing capability + # + _set_last_task_error(e.domain, e.reason) + + except Exception as e: # pylint: disable=broad-except + + # Report unhandled exceptions and mark as failed + # + self.message(element, MessageType.BUG, + "Unhandled exception in post processing", + detail=traceback.format_exc()) + self.failed_elements.append(element) else: - skip = True - if returncode == 0: - self.done_queue.append(element) - if skip: - self.skipped_elements.append(element) + # No exception occured, handle the success/failure state in the normal way + # + if returncode == 0: + self.done_queue.append(element) + if processed: + self.processed_elements.append(element) + else: + self.skipped_elements.append(element) else: - self.processed_elements.append(element) - else: - self.failed_elements.append(element) + self.failed_elements.append(element) # Give the token for this job back to the scheduler # immediately before invoking another round of scheduling @@ -231,3 +260,10 @@ class Queue(): self.scheduler.job_completed(self, job, returncode == 0) self.scheduler.sched() + + # Convenience wrapper for Queue implementations to send + # a message for the element they are processing + def message(self, element, message_type, brief, **kwargs): + context = element._get_context() + message = Message(element._get_unique_id(), message_type, brief, **kwargs) + context._message(message) |