summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/jobs/job.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_scheduler/jobs/job.py')
-rw-r--r--buildstream/_scheduler/jobs/job.py90
1 files changed, 50 insertions, 40 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 60ae0d001..ce5fa4522 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -32,7 +32,7 @@ import psutil
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import MessageType, unconditional_messages
from ... import _signals, utils
# Return code values shutdown of job handling child processes
@@ -110,6 +110,7 @@ class Job():
# Private members
#
self._scheduler = scheduler # The scheduler
+ self._context = scheduler.context # The context, used primarily for UI messaging.
self._queue = None # A message passing queue
self._process = None # The Process object
self._watcher = None # Child process watcher
@@ -184,7 +185,7 @@ class Job():
# First resume the job if it's suspended
self.resume(silent=True)
- self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
+ self._status("{} terminating".format(self.action_name))
# Make sure there is no garbage on the queue
self._parent_stop_listening()
@@ -217,8 +218,8 @@ class Job():
def kill(self):
# Force kill
- self.message(MessageType.WARN,
- "{} did not terminate gracefully, killing".format(self.action_name))
+ self._warn("{} did not terminate gracefully, killing"
+ .format(self.action_name))
try:
utils._kill_process_tree(self._process.pid)
@@ -233,8 +234,7 @@ class Job():
#
def suspend(self):
if not self._suspended:
- self.message(MessageType.STATUS,
- "{} suspending".format(self.action_name))
+ self._status("{} suspending".format(self.action_name))
try:
# Use SIGTSTP so that child processes may handle and propagate
@@ -258,8 +258,7 @@ class Job():
def resume(self, silent=False):
if self._suspended:
if not silent and not self._scheduler.terminated:
- self.message(MessageType.STATUS,
- "{} resuming".format(self.action_name))
+ self._status("{} resuming".format(self.action_name))
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
@@ -324,21 +323,6 @@ class Job():
raise ImplError("Job '{kind}' does not implement child_process()"
.format(kind=type(self).__name__))
- # message():
- #
- # Logs a message, this will be logged in the task's logfile and
- # conditionally also be sent to the frontend.
- #
- # Args:
- # message_type (MessageType): The type of message to send
- # message (str): The message
- # kwargs: Remaining Message() constructor arguments
- #
- def message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.message(Message(None, message_type, message, **args))
-
# child_process_data()
#
# Abstract method to retrieve additional data that should be
@@ -365,6 +349,32 @@ class Job():
#
#######################################################
+ def _debug(self, text, **kwargs):
+ self._context.debug(text, task_id=self._task_id, **kwargs)
+
+ def _status(self, text, **kwargs):
+ self._context.status(text, task_id=self._task_id, **kwargs)
+
+ def _info(self, text, **kwargs):
+ self._context.info(text, task_id=self._task_id, **kwargs)
+
+ def _warn(self, text, **kwargs):
+ self._context.warn(text, task_id=self._task_id, **kwargs)
+
+ def _error(self, text, **kwargs):
+ self._context.error(text, task_id=self._task_id, **kwargs)
+
+ def _log(self, text, **kwargs):
+ self._context.log(text, task_id=self._task_id, **kwargs)
+
+ # _fail()
+ #
+ # Only exists for sub classes to override and add kwargs to.
+ #
+ def _fail(self, text, **kwargs):
+ self._context.message(text, task_id=self._task_id,
+ msg_type=MessageType.FAIL, **kwargs)
+
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -391,7 +401,7 @@ class Job():
# Set the global message handler in this child
# process to forward messages to the parent process
self._queue = queue
- self._scheduler.context.set_message_handler(self._child_message_handler)
+ self._context.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()
stopped_time = None
@@ -408,17 +418,17 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- self._scheduler.context.recorded_messages(self._logfile) as filename:
+ self._context.recorded_messages(self._logfile) as filename:
- self.message(MessageType.START, self.action_name, logfile=filename)
+ self._context.message(self.action_name, logfile=filename,
+ msg_type=MessageType.START, task_id=self._task_id)
try:
# Try the task action
result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
elapsed = datetime.datetime.now() - starttime
- self.message(MessageType.SKIPPED, str(e),
- elapsed=elapsed, logfile=filename)
+ self._context.skipped(e, elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
self._child_shutdown(RC_SKIPPED)
@@ -427,13 +437,11 @@ class Job():
self._retry_flag = e.temporary
if self._retry_flag and (self._tries <= self._max_retries):
- self.message(MessageType.FAIL,
- "Try #{} failed, retrying".format(self._tries),
- elapsed=elapsed, logfile=filename)
+ self._fail("Try #{} failed, retrying".format(self._tries),
+ elapsed=elapsed, logfile=filename)
else:
- self.message(MessageType.FAIL, str(e),
- elapsed=elapsed, detail=e.detail,
- logfile=filename, sandbox=e.sandbox)
+ self._fail(e, elapsed=elapsed, detail=e.detail,
+ logfile=filename, sandbox=e.sandbox)
self._queue.put(Envelope('child_data', self.child_process_data()))
@@ -453,9 +461,9 @@ class Job():
elapsed = datetime.datetime.now() - starttime
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self.message(MessageType.BUG, self.action_name,
- elapsed=elapsed, detail=detail,
- logfile=filename)
+ self._context.message(self.action_name, elapsed=elapsed,
+ detail=detail, logfile=filename,
+ task_id=self._task_id, msg_type=MessageType.BUG)
# Unhandled exceptions should permenantly fail
self._child_shutdown(RC_PERM_FAIL)
@@ -465,8 +473,10 @@ class Job():
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
- logfile=filename)
+ self._context.message(self.action_name,
+ elapsed=elapsed, logfile=filename,
+ msg_type=MessageType.SUCCESS,
+ task_id=self._task_id)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
@@ -603,7 +613,7 @@ class Job():
if envelope._message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope._message)
+ self._context._send_message(envelope._message)
elif envelope._message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state