diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-09-05 17:30:35 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-09-10 10:12:30 +0100 |
commit | 0becc13bdb8cf3e6a0bc271ce4fa118f52dba50e (patch) | |
tree | 8f33cb7f1f03c48ac57f43a622f2aa5bf3f55555 | |
parent | 50a1464ee9185ee737fe20455d9b73ca598575cc (diff) | |
download | buildstream-tpollard/notificationhandler.tar.gz |
scheduler.py: Notification for Message() propagationtpollard/notificationhandler
Add a notification for MESSAGE. Instead of scheduler's Queues and
Jobs directly calling the message handler that App has assigned to
Context, the Message() is now sent over the notification handler
where it is then given to Messenger's handler.
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 6 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 3 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 16 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 4 |
4 files changed, 22 insertions, 7 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 9af08df92..913e27ea2 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -393,8 +393,8 @@ class Job(): element_name = self._message_element_name if element_key is None: element_key = self._message_element_key - self._scheduler.context.messenger.message( - Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)) + message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) + self._scheduler.notify_messenger(message) # get_element() # @@ -536,7 +536,7 @@ class Job(): if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. - self._scheduler.context.messenger.message(envelope.message) + self._scheduler.notify_messenger(envelope.message) elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 745b59417..6c6dfdc4f 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -345,9 +345,8 @@ class Queue(): # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): - context = element._get_context() message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) - context.messenger.message(message) + self._scheduler.notify_messenger(message) def _element_log_path(self, element): project = element._get_project() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index b892296f5..d0a189545 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -61,6 +61,7 @@ class NotificationType(FastEnum): UNSUSPEND = "unsuspend" SUSPENDED = "suspended" RETRY = "retry" + MESSAGE = "message" # Notification() @@ -80,13 +81,15 @@ class Notification(): job_action=None, job_status=None, time=None, - element=None): + element=None, + message=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action self.job_status = job_status self.time = time self.element = element + self.message = message # Scheduler() @@ -301,6 +304,17 @@ class Scheduler(): self._notify(notification) self._sched() + # notify_messenger() + # + # Send message over notification queue to Messenger callback + # + # Args: + # message (Message): A Message() to be sent to the frontend message + # handler, as assigned by context's messenger. + # + def notify_messenger(self, message): + self._notify(Notification(NotificationType.MESSAGE, message=message)) + ####################################################### # Local Private Methods # ####################################################### diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 6d8d918dd..293ba051d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1653,7 +1653,9 @@ class Stream(): assert self._notification_queue notification = self._notification_queue.pop() - if notification.notification_type == NotificationType.INTERRUPT: + if notification.notification_type == NotificationType.MESSAGE: + self._context.messenger.message(notification.message) + elif notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() elif notification.notification_type == NotificationType.TICK: self._ticker_callback() |