diff options
author | Benjamin Schubert <contact@benschubert.me> | 2020-07-04 16:11:17 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-07-06 18:07:53 +0000 |
commit | e16433a8b72a81d2afd11dd4d2d58f24512e259d (patch) | |
tree | 1f84b92733a6039f7f96f2e121c5ae6b12e3a57d | |
parent | e79f4a019d1d4c23d442f61144e6ac5177eb36b2 (diff) | |
download | buildstream-e16433a8b72a81d2afd11dd4d2d58f24512e259d.tar.gz |
scheduler.py: Remove 'Message' notification type, use the messenger
The messenger should be the one receiving messages directly, we don't
need this indirection
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 16 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 6 |
4 files changed, 9 insertions, 22 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 78a375fec..7ea87dc62 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -128,6 +128,7 @@ class Job: # Private members # self._scheduler = scheduler # The scheduler + self._messenger = self._scheduler.context.messenger self._pipe_r = None # The read end of a pipe for message passing self._process = None # The Process object self._listening = False # Whether the parent is currently listening @@ -163,7 +164,7 @@ class Job: child_job = self.create_child_job( # pylint: disable=assignment-from-no-return self.action_name, - self._scheduler.context.messenger, + self._messenger, self._scheduler.context.logdir, self._logfile, self._max_retries, @@ -314,7 +315,7 @@ class Job: if element_key is None: element_key = self._message_element_key message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) - self._scheduler.notify_messenger(message) + self._messenger.message(message) # get_element() # @@ -470,7 +471,7 @@ class Job: if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. - self._scheduler.notify_messenger(envelope.message) + self._messenger.message(envelope.message) elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 986ac6c0a..9e444b393 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -352,7 +352,7 @@ class Queue: # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) - self._scheduler.notify_messenger(message) + self._scheduler.context.messenger.message(message) def _element_log_path(self, element): project = element._get_project() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 66174ad19..14d9c53d5 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -67,7 +67,6 @@ class NotificationType(FastEnum): UNSUSPEND = "unsuspend" SUSPENDED = "suspended" RETRY = "retry" - MESSAGE = "message" # Notification() @@ -334,17 +333,6 @@ class Scheduler: self._notify(notification) self._sched() - # notify_messenger() - # - # Send message over notification queue to Messenger callback - # - # Args: - # message (Message): A Message() to be sent to the frontend message - # handler, as assigned by context's messenger. - # - def notify_messenger(self, message): - self._notify(Notification(NotificationType.MESSAGE, message=message)) - ####################################################### # Local Private Methods # ####################################################### @@ -362,7 +350,7 @@ class Scheduler: # def _abort_on_casd_failure(self, pid, returncode): message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.") - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self.context.messenger.message(message) self._casd_process.returncode = returncode self.terminate_jobs() @@ -435,7 +423,7 @@ class Scheduler: # Make sure fork is allowed before starting jobs if not self.context.prepare_fork(): message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self.context.messenger.message(message) self.terminate_jobs() return diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 989a00db7..64e4d09ec 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1658,9 +1658,7 @@ class Stream: assert self._notification_queue notification = self._notification_queue.pop() - if notification.notification_type == NotificationType.MESSAGE: - self._context.messenger.message(notification.message) - elif notification.notification_type == NotificationType.INTERRUPT: + if notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() elif notification.notification_type == NotificationType.TICK: self._ticker_callback() @@ -1679,7 +1677,7 @@ class Stream: elif notification.notification_type == NotificationType.SUSPENDED: self._scheduler_suspended = not self._scheduler_suspended else: - raise StreamError("Unrecognised notification type received") + raise StreamError("Unrecognised notification type received: {}".format(notification.notification_type)) def _notify(self, notification): # Stream to scheduler notifcations on left side |