diff options
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 6 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 3 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 16 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 4 |
4 files changed, 22 insertions, 7 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 9af08df92..913e27ea2 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -393,8 +393,8 @@ class Job(): element_name = self._message_element_name if element_key is None: element_key = self._message_element_key - self._scheduler.context.messenger.message( - Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)) + message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) + self._scheduler.notify_messenger(message) # get_element() # @@ -536,7 +536,7 @@ class Job(): if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. - self._scheduler.context.messenger.message(envelope.message) + self._scheduler.notify_messenger(envelope.message) elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 745b59417..6c6dfdc4f 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -345,9 +345,8 @@ class Queue(): # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): - context = element._get_context() message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) - context.messenger.message(message) + self._scheduler.notify_messenger(message) def _element_log_path(self, element): project = element._get_project() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index b892296f5..d0a189545 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -61,6 +61,7 @@ class NotificationType(FastEnum): UNSUSPEND = "unsuspend" SUSPENDED = "suspended" RETRY = "retry" + MESSAGE = "message" # Notification() @@ -80,13 +81,15 @@ class Notification(): job_action=None, job_status=None, time=None, - element=None): + element=None, + message=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action self.job_status = job_status self.time = time self.element = element + self.message = message # Scheduler() @@ -301,6 +304,17 @@ class Scheduler(): self._notify(notification) self._sched() + # notify_messenger() + # + # Send message over notification queue to Messenger callback + # + # Args: + # message (Message): A Message() to be sent to the frontend message + # handler, as assigned by context's messenger. + # + def notify_messenger(self, message): + self._notify(Notification(NotificationType.MESSAGE, message=message)) + ####################################################### # Local Private Methods # ####################################################### diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 6d8d918dd..293ba051d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1653,7 +1653,9 @@ class Stream(): assert self._notification_queue notification = self._notification_queue.pop() - if notification.notification_type == NotificationType.INTERRUPT: + if notification.notification_type == NotificationType.MESSAGE: + self._context.messenger.message(notification.message) + elif notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() elif notification.notification_type == NotificationType.TICK: self._ticker_callback() |