summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_scheduler/jobs/job.py6
-rw-r--r--src/buildstream/_scheduler/queues/queue.py3
-rw-r--r--src/buildstream/_scheduler/scheduler.py16
-rw-r--r--src/buildstream/_stream.py4
4 files changed, 22 insertions, 7 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 9af08df92..913e27ea2 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -393,8 +393,8 @@ class Job():
element_name = self._message_element_name
if element_key is None:
element_key = self._message_element_key
- self._scheduler.context.messenger.message(
- Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs))
+ message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
+ self._scheduler.notify_messenger(message)
# get_element()
#
@@ -536,7 +536,7 @@ class Job():
if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
- self._scheduler.context.messenger.message(envelope.message)
+ self._scheduler.notify_messenger(envelope.message)
elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 745b59417..6c6dfdc4f 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -345,9 +345,8 @@ class Queue():
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
- context = element._get_context()
message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
- context.messenger.message(message)
+ self._scheduler.notify_messenger(message)
def _element_log_path(self, element):
project = element._get_project()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index b892296f5..d0a189545 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -61,6 +61,7 @@ class NotificationType(FastEnum):
UNSUSPEND = "unsuspend"
SUSPENDED = "suspended"
RETRY = "retry"
+ MESSAGE = "message"
# Notification()
@@ -80,13 +81,15 @@ class Notification():
job_action=None,
job_status=None,
time=None,
- element=None):
+ element=None,
+ message=None):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
self.job_status = job_status
self.time = time
self.element = element
+ self.message = message
# Scheduler()
@@ -301,6 +304,17 @@ class Scheduler():
self._notify(notification)
self._sched()
+ # notify_messenger()
+ #
+ # Send message over notification queue to Messenger callback
+ #
+ # Args:
+ # message (Message): A Message() to be sent to the frontend message
+ # handler, as assigned by context's messenger.
+ #
+ def notify_messenger(self, message):
+ self._notify(Notification(NotificationType.MESSAGE, message=message))
+
#######################################################
# Local Private Methods #
#######################################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6d8d918dd..293ba051d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1653,7 +1653,9 @@ class Stream():
assert self._notification_queue
notification = self._notification_queue.pop()
- if notification.notification_type == NotificationType.INTERRUPT:
+ if notification.notification_type == NotificationType.MESSAGE:
+ self._context.messenger.message(notification.message)
+ elif notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK:
self._ticker_callback()