summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2018-07-23 18:36:41 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2018-07-23 18:37:14 +0100
commit957d3acb9c4ecc73aa4a9b154c0e87138d3b3d7e (patch)
tree343455a7db42663e6324c628ea23097f322d41d5
parent4cda0705622faa053833f04991e2fa062be55457 (diff)
downloadbuildstream-tlater/message-helpers.tar.gz
job.py: Add new message helperstlater/message-helpers
-rw-r--r--buildstream/_scheduler/job.py85
1 files changed, 54 insertions, 31 deletions
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py
index c501eb067..266dd1fb5 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/job.py
@@ -173,8 +173,7 @@ class Job():
# First resume the job if it's suspended
self.resume(silent=True)
- self._message("{} terminating".format(self.action_name),
- MessageType.STATUS, self.element)
+ self._status("{} terminating".format(self.action_name), self.element)
# Make sure there is no garbage on the queue
self._parent_stop_listening()
@@ -205,9 +204,8 @@ class Job():
def kill(self):
# Force kill
- self._message("{} did not terminate gracefully, killing"
- .format(self.action_name), self.element,
- MessageType.WARN)
+ self._warn("{} did not terminate gracefully, killing"
+ .format(self.action_name), self.element)
utils._kill_process_tree(self._process.pid)
# suspend()
@@ -216,8 +214,7 @@ class Job():
#
def suspend(self):
if not self._suspended:
- self._message("{} suspending".format(self.action_name),
- self.element, MessageType.STATUS)
+ self._status("{} suspending".format(self.action_name), self.element)
try:
# Use SIGTSTP so that child processes may handle and propagate
@@ -241,8 +238,7 @@ class Job():
def resume(self, silent=False):
if self._suspended:
if not silent:
- self._message("{} resuming".format(self.action_name),
- self.element, MessageType.STATUS)
+ self._status("{} resuming".format(self.action_name), self.element)
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
@@ -269,16 +265,45 @@ class Job():
# message (str): The message
# kwargs: Remaining Message() constructor arguments
#
- def _message(self, message, message_type, plugin, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.msg(
- message,
+ def _message(self, text, plugin, *, msg_type=None, **kwargs):
+ self._scheduler.context._message(
+ text,
plugin=plugin._get_unique_id(),
- msg_type=message_type,
- **args
+ msg_type=msg_type,
+ scheduler=True,
+ **kwargs
)
+ def _start(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.START, **kwargs)
+
+ def _success(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.SUCCESS, **kwargs)
+
+ def _failure(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.FAIL, **kwargs)
+
+ def _debug(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.DEBUG, **kwargs)
+
+ def _status(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.STATUS, **kwargs)
+
+ def _info(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.INFO, **kwargs)
+
+ def _warn(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.WARN, **kwargs)
+
+ def _error(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.ERROR, **kwargs)
+
+ def _bug(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.BUG, **kwargs)
+
+ def _log(self, *args, **kwargs):
+ self._message(*args, msg_type=MessageType.LOG, **kwargs)
+
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -326,16 +351,15 @@ class Job():
with _signals.suspendable(stop_time, resume_time), \
element._logging_enabled(self.action_name) as filename:
- self._message(self.action_name, MessageType.START, element, logfile=filename)
+ self._start(self.action_name, element, logfile=filename)
# Print the element's environment at the beginning of any element's log file.
#
# This should probably be omitted for non-build tasks but it's harmless here
elt_env = element.get_environment()
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self._message("Build environment for element {}".format(element.name),
- MessageType.LOG, element,
- detail=env_dump, logfile=filename)
+ self._log("Build environment for element {}".format(element.name),
+ element, detail=env_dump, logfile=filename)
try:
# Try the task action
@@ -344,12 +368,12 @@ class Job():
elapsed = datetime.datetime.now() - starttime
if self._tries <= self._max_retries:
- self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
- elapsed=elapsed)
+ self._failure("Try #{} failed, retrying".format(self._tries),
+ element, elapsed=elapsed)
else:
- self._message(element, MessageType.FAIL, str(e),
- elapsed=elapsed, detail=e.detail,
- logfile=filename, sandbox=e.sandbox)
+ self._failure(str(e), element, elapsed=elapsed,
+ detail=e.detail, logfile=filename,
+ sandbox=e.sandbox)
# Report changes in the workspace, even if there was a handled failure
self._child_send_workspace()
@@ -366,9 +390,8 @@ class Job():
#
elapsed = datetime.datetime.now() - starttime
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self._message(element, MessageType.BUG, self.action_name,
- elapsed=elapsed, detail=detail,
- logfile=filename)
+ self._bug(self.action_name, element, elapsed=elapsed,
+ detail=detail, logfile=filename)
self._child_shutdown(1)
else:
@@ -377,8 +400,8 @@ class Job():
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
- self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
- logfile=filename)
+ self._success(self.action_name, element,
+ elapsed=elapsed, logfile=filename)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
@@ -562,7 +585,7 @@ class Job():
if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope.message)
+ self._scheduler.context._send_message(envelope.message)
elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state