summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-04 16:11:17 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-07-06 18:07:53 +0000
commite16433a8b72a81d2afd11dd4d2d58f24512e259d (patch)
tree1f84b92733a6039f7f96f2e121c5ae6b12e3a57d
parente79f4a019d1d4c23d442f61144e6ac5177eb36b2 (diff)
downloadbuildstream-e16433a8b72a81d2afd11dd4d2d58f24512e259d.tar.gz
scheduler.py: Remove 'Message' notification type, use the messenger
The messenger should be the one receiving messages directly, we don't need this indirection
-rw-r--r--src/buildstream/_scheduler/jobs/job.py7
-rw-r--r--src/buildstream/_scheduler/queues/queue.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py16
-rw-r--r--src/buildstream/_stream.py6
4 files changed, 9 insertions, 22 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 78a375fec..7ea87dc62 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -128,6 +128,7 @@ class Job:
# Private members
#
self._scheduler = scheduler # The scheduler
+ self._messenger = self._scheduler.context.messenger
self._pipe_r = None # The read end of a pipe for message passing
self._process = None # The Process object
self._listening = False # Whether the parent is currently listening
@@ -163,7 +164,7 @@ class Job:
child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
self.action_name,
- self._scheduler.context.messenger,
+ self._messenger,
self._scheduler.context.logdir,
self._logfile,
self._max_retries,
@@ -314,7 +315,7 @@ class Job:
if element_key is None:
element_key = self._message_element_key
message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
- self._scheduler.notify_messenger(message)
+ self._messenger.message(message)
# get_element()
#
@@ -470,7 +471,7 @@ class Job:
if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
- self._scheduler.notify_messenger(envelope.message)
+ self._messenger.message(envelope.message)
elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 986ac6c0a..9e444b393 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -352,7 +352,7 @@ class Queue:
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
- self._scheduler.notify_messenger(message)
+ self._scheduler.context.messenger.message(message)
def _element_log_path(self, element):
project = element._get_project()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 66174ad19..14d9c53d5 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -67,7 +67,6 @@ class NotificationType(FastEnum):
UNSUSPEND = "unsuspend"
SUSPENDED = "suspended"
RETRY = "retry"
- MESSAGE = "message"
# Notification()
@@ -334,17 +333,6 @@ class Scheduler:
self._notify(notification)
self._sched()
- # notify_messenger()
- #
- # Send message over notification queue to Messenger callback
- #
- # Args:
- # message (Message): A Message() to be sent to the frontend message
- # handler, as assigned by context's messenger.
- #
- def notify_messenger(self, message):
- self._notify(Notification(NotificationType.MESSAGE, message=message))
-
#######################################################
# Local Private Methods #
#######################################################
@@ -362,7 +350,7 @@ class Scheduler:
#
def _abort_on_casd_failure(self, pid, returncode):
message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
- self._notify(Notification(NotificationType.MESSAGE, message=message))
+ self.context.messenger.message(message)
self._casd_process.returncode = returncode
self.terminate_jobs()
@@ -435,7 +423,7 @@ class Scheduler:
# Make sure fork is allowed before starting jobs
if not self.context.prepare_fork():
message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
- self._notify(Notification(NotificationType.MESSAGE, message=message))
+ self.context.messenger.message(message)
self.terminate_jobs()
return
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 989a00db7..64e4d09ec 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1658,9 +1658,7 @@ class Stream:
assert self._notification_queue
notification = self._notification_queue.pop()
- if notification.notification_type == NotificationType.MESSAGE:
- self._context.messenger.message(notification.message)
- elif notification.notification_type == NotificationType.INTERRUPT:
+ if notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK:
self._ticker_callback()
@@ -1679,7 +1677,7 @@ class Stream:
elif notification.notification_type == NotificationType.SUSPENDED:
self._scheduler_suspended = not self._scheduler_suspended
else:
- raise StreamError("Unrecognised notification type received")
+ raise StreamError("Unrecognised notification type received: {}".format(notification.notification_type))
def _notify(self, notification):
# Stream to scheduler notifcations on left side