diff options
Diffstat (limited to 'buildstream/_scheduler/jobs/job.py')
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 90 |
1 files changed, 50 insertions, 40 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 60ae0d001..ce5fa4522 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -32,7 +32,7 @@ import psutil # BuildStream toplevel imports from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob -from ..._message import Message, MessageType, unconditional_messages +from ..._message import MessageType, unconditional_messages from ... import _signals, utils # Return code values shutdown of job handling child processes @@ -110,6 +110,7 @@ class Job(): # Private members # self._scheduler = scheduler # The scheduler + self._context = scheduler.context # The context, used primarily for UI messaging. self._queue = None # A message passing queue self._process = None # The Process object self._watcher = None # Child process watcher @@ -184,7 +185,7 @@ class Job(): # First resume the job if it's suspended self.resume(silent=True) - self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) + self._status("{} terminating".format(self.action_name)) # Make sure there is no garbage on the queue self._parent_stop_listening() @@ -217,8 +218,8 @@ class Job(): def kill(self): # Force kill - self.message(MessageType.WARN, - "{} did not terminate gracefully, killing".format(self.action_name)) + self._warn("{} did not terminate gracefully, killing" + .format(self.action_name)) try: utils._kill_process_tree(self._process.pid) @@ -233,8 +234,7 @@ class Job(): # def suspend(self): if not self._suspended: - self.message(MessageType.STATUS, - "{} suspending".format(self.action_name)) + self._status("{} suspending".format(self.action_name)) try: # Use SIGTSTP so that child processes may handle and propagate @@ -258,8 +258,7 @@ class Job(): def resume(self, silent=False): if self._suspended: if not silent and not self._scheduler.terminated: - self.message(MessageType.STATUS, - "{} resuming".format(self.action_name)) + self._status("{} resuming".format(self.action_name)) os.kill(self._process.pid, signal.SIGCONT) self._suspended = False @@ -324,21 +323,6 @@ class Job(): raise ImplError("Job '{kind}' does not implement child_process()" .format(kind=type(self).__name__)) - # message(): - # - # Logs a message, this will be logged in the task's logfile and - # conditionally also be sent to the frontend. - # - # Args: - # message_type (MessageType): The type of message to send - # message (str): The message - # kwargs: Remaining Message() constructor arguments - # - def message(self, message_type, message, **kwargs): - args = dict(kwargs) - args['scheduler'] = True - self._scheduler.context.message(Message(None, message_type, message, **args)) - # child_process_data() # # Abstract method to retrieve additional data that should be @@ -365,6 +349,32 @@ class Job(): # ####################################################### + def _debug(self, text, **kwargs): + self._context.debug(text, task_id=self._task_id, **kwargs) + + def _status(self, text, **kwargs): + self._context.status(text, task_id=self._task_id, **kwargs) + + def _info(self, text, **kwargs): + self._context.info(text, task_id=self._task_id, **kwargs) + + def _warn(self, text, **kwargs): + self._context.warn(text, task_id=self._task_id, **kwargs) + + def _error(self, text, **kwargs): + self._context.error(text, task_id=self._task_id, **kwargs) + + def _log(self, text, **kwargs): + self._context.log(text, task_id=self._task_id, **kwargs) + + # _fail() + # + # Only exists for sub classes to override and add kwargs to. + # + def _fail(self, text, **kwargs): + self._context.message(text, task_id=self._task_id, + msg_type=MessageType.FAIL, **kwargs) + # _child_action() # # Perform the action in the child process, this calls the action_cb. @@ -391,7 +401,7 @@ class Job(): # Set the global message handler in this child # process to forward messages to the parent process self._queue = queue - self._scheduler.context.set_message_handler(self._child_message_handler) + self._context.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() stopped_time = None @@ -408,17 +418,17 @@ class Job(): # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ - self._scheduler.context.recorded_messages(self._logfile) as filename: + self._context.recorded_messages(self._logfile) as filename: - self.message(MessageType.START, self.action_name, logfile=filename) + self._context.message(self.action_name, logfile=filename, + msg_type=MessageType.START, task_id=self._task_id) try: # Try the task action result = self.child_process() # pylint: disable=assignment-from-no-return except SkipJob as e: elapsed = datetime.datetime.now() - starttime - self.message(MessageType.SKIPPED, str(e), - elapsed=elapsed, logfile=filename) + self._context.skipped(e, elapsed=elapsed, logfile=filename) # Alert parent of skip by return code self._child_shutdown(RC_SKIPPED) @@ -427,13 +437,11 @@ class Job(): self._retry_flag = e.temporary if self._retry_flag and (self._tries <= self._max_retries): - self.message(MessageType.FAIL, - "Try #{} failed, retrying".format(self._tries), - elapsed=elapsed, logfile=filename) + self._fail("Try #{} failed, retrying".format(self._tries), + elapsed=elapsed, logfile=filename) else: - self.message(MessageType.FAIL, str(e), - elapsed=elapsed, detail=e.detail, - logfile=filename, sandbox=e.sandbox) + self._fail(e, elapsed=elapsed, detail=e.detail, + logfile=filename, sandbox=e.sandbox) self._queue.put(Envelope('child_data', self.child_process_data())) @@ -453,9 +461,9 @@ class Job(): elapsed = datetime.datetime.now() - starttime detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) - self.message(MessageType.BUG, self.action_name, - elapsed=elapsed, detail=detail, - logfile=filename) + self._context.message(self.action_name, elapsed=elapsed, + detail=detail, logfile=filename, + task_id=self._task_id, msg_type=MessageType.BUG) # Unhandled exceptions should permenantly fail self._child_shutdown(RC_PERM_FAIL) @@ -465,8 +473,10 @@ class Job(): self._child_send_result(result) elapsed = datetime.datetime.now() - starttime - self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, - logfile=filename) + self._context.message(self.action_name, + elapsed=elapsed, logfile=filename, + msg_type=MessageType.SUCCESS, + task_id=self._task_id) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process @@ -603,7 +613,7 @@ class Job(): if envelope._message_type == 'message': # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope._message) + self._context._send_message(envelope._message) elif envelope._message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state |