summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-09-05 17:30:35 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-09-10 10:12:30 +0100
commit0becc13bdb8cf3e6a0bc271ce4fa118f52dba50e (patch)
tree8f33cb7f1f03c48ac57f43a622f2aa5bf3f55555
parent50a1464ee9185ee737fe20455d9b73ca598575cc (diff)
downloadbuildstream-tpollard/notificationhandler.tar.gz
scheduler.py: Notification for Message() propagationtpollard/notificationhandler
Add a notification for MESSAGE. Instead of scheduler's Queues and Jobs directly calling the message handler that App has assigned to Context, the Message() is now sent over the notification handler where it is then given to Messenger's handler.
-rw-r--r--src/buildstream/_scheduler/jobs/job.py6
-rw-r--r--src/buildstream/_scheduler/queues/queue.py3
-rw-r--r--src/buildstream/_scheduler/scheduler.py16
-rw-r--r--src/buildstream/_stream.py4
4 files changed, 22 insertions, 7 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 9af08df92..913e27ea2 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -393,8 +393,8 @@ class Job():
element_name = self._message_element_name
if element_key is None:
element_key = self._message_element_key
- self._scheduler.context.messenger.message(
- Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs))
+ message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
+ self._scheduler.notify_messenger(message)
# get_element()
#
@@ -536,7 +536,7 @@ class Job():
if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
- self._scheduler.context.messenger.message(envelope.message)
+ self._scheduler.notify_messenger(envelope.message)
elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 745b59417..6c6dfdc4f 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -345,9 +345,8 @@ class Queue():
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
- context = element._get_context()
message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
- context.messenger.message(message)
+ self._scheduler.notify_messenger(message)
def _element_log_path(self, element):
project = element._get_project()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index b892296f5..d0a189545 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -61,6 +61,7 @@ class NotificationType(FastEnum):
UNSUSPEND = "unsuspend"
SUSPENDED = "suspended"
RETRY = "retry"
+ MESSAGE = "message"
# Notification()
@@ -80,13 +81,15 @@ class Notification():
job_action=None,
job_status=None,
time=None,
- element=None):
+ element=None,
+ message=None):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
self.job_status = job_status
self.time = time
self.element = element
+ self.message = message
# Scheduler()
@@ -301,6 +304,17 @@ class Scheduler():
self._notify(notification)
self._sched()
+ # notify_messenger()
+ #
+ # Send message over notification queue to Messenger callback
+ #
+ # Args:
+ # message (Message): A Message() to be sent to the frontend message
+ # handler, as assigned by context's messenger.
+ #
+ def notify_messenger(self, message):
+ self._notify(Notification(NotificationType.MESSAGE, message=message))
+
#######################################################
# Local Private Methods #
#######################################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6d8d918dd..293ba051d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1653,7 +1653,9 @@ class Stream():
assert self._notification_queue
notification = self._notification_queue.pop()
- if notification.notification_type == NotificationType.INTERRUPT:
+ if notification.notification_type == NotificationType.MESSAGE:
+ self._context.messenger.message(notification.message)
+ elif notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK:
self._ticker_callback()