summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/jobs/job.py
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-07-18 18:15:07 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-07-18 18:59:03 +0900
commitd835c37f2514767b64f6fa5eee0821ec1943a7fd (patch)
tree22d11a25ac1bd6374fa62545395b836c5a9bfd80 /buildstream/_scheduler/jobs/job.py
parenta950a985e0fa05cf4f7b1b1bbc1de6db338e821c (diff)
downloadbuildstream-d835c37f2514767b64f6fa5eee0821ec1943a7fd.tar.gz
Refactor: Use new logging mechanism from Contexttristan/local-cache-expiry
o element.py / plugin.py: Removed supporting logging code, and derive the log handle from Context. o _scheduler/scheduler.py, _scheduler/queues/queue.py: Adapt to new Job initializer API for the logfile o _scheduler/jobs/job.py: Run job activities within the new context manager which turns on logging for a job. Also removed a lot of custom logging abstract methods which are unneeded. o _scheduler/jobs: Job implementations need not implement so much custom logging.
Diffstat (limited to 'buildstream/_scheduler/jobs/job.py')
-rw-r--r--buildstream/_scheduler/jobs/job.py113
1 files changed, 30 insertions, 83 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index dec3a0c11..6d4b685af 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -27,13 +27,12 @@ import datetime
import traceback
import asyncio
import multiprocessing
-from contextlib import contextmanager
import psutil
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error
-from ..._message import MessageType, unconditional_messages
+from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
@@ -113,6 +112,7 @@ class Job():
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
self._logfile = logfile
+ self._task_id = None
# spawn()
#
@@ -247,6 +247,24 @@ class Job():
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
+ # set_task_id()
+ #
+ # This is called by Job subclasses to set a plugin ID
+ # associated with the task at large (if any element is related
+ # to the task).
+ #
+ # The task ID helps keep messages in the frontend coherent
+ # in the case that multiple plugins log in the context of
+ # a single task (e.g. running integration commands should appear
+ # in the frontend for the element being built, not the element
+ # running the integration commands).
+ #
+ # Args:
+ # (int): The plugin identifier for this task
+ #
+ def set_task_id(self, task_id):
+ self._task_id = task_id
+
#######################################################
# Abstract Methods #
#######################################################
@@ -277,28 +295,10 @@ class Job():
raise ImplError("Job '{kind}' does not implement child_process()"
.format(kind=type(self).__name__))
- # child_logging_enabled()
- #
- # Start the log for this job. This function will be given a
- # template string for the path to a log file - this will contain
- # "{pid}", which should be replaced with the current process'
- # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
- #
- # Args:
- # logfile (str): A template string that points to the logfile
- # that should be used - replace {pid} first.
- #
- # Yields:
- # (str) The path to the logfile with {pid} replaced.
- #
- @contextmanager
- def child_logging_enabled(self, logfile):
- raise ImplError("Job '{kind}' does not implement child_logging_enabled()"
- .format(kind=type(self).__name__))
-
# message():
#
- # Sends a message to the frontend
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
#
# Args:
# message_type (MessageType): The type of message to send
@@ -306,8 +306,9 @@ class Job():
# kwargs: Remaining Message() constructor arguments
#
def message(self, message_type, message, **kwargs):
- raise ImplError("Job '{kind}' does not implement message()"
- .format(kind=type(self).__name__))
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(Message(None, message_type, message, **args))
# child_process_data()
#
@@ -323,24 +324,6 @@ class Job():
def child_process_data(self):
return {}
- # child_log()
- #
- # Log a message returned by the frontend's main message handler
- # and return it to the main process.
- #
- # This method is also expected to add process-specific information
- # to the message (notably, action_name and task_id).
- #
- # Arguments:
- # message (str): The message to log
- #
- # Returns:
- # message (Message): A message object
- #
- def child_log(self, message):
- raise ImplError("Job '{kind}' does not implement child_log()"
- .format(kind=type(self).__name__))
-
#######################################################
# Local Private Methods #
#######################################################
@@ -353,42 +336,6 @@ class Job():
#
#######################################################
- # decorate_message()
- #
- # Format a message so that it can be used nicely for logging
- # purposes. This will prepend a time code and add other
- # information to help determine what happened.
- #
- # Args:
- # message (Message) - The message to create a text from.
- # name (str) - A name for the executing context.
- #
- # Returns:
- # (str) The text to log.
- #
- def decorate_message(self, message, name):
- INDENT = " "
- EMPTYTIME = "--:--:--"
- template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
-
- detail = ''
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- return template.format(timecode=timecode,
- type=message.message_type.upper(),
- name=name,
- message=message.message,
- detail=detail)
-
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -398,8 +345,6 @@ class Job():
#
def _child_action(self, queue):
- logfile = self._logfile
-
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
os.setsid()
@@ -434,7 +379,9 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- self.child_logging_enabled(logfile) as filename:
+ self._scheduler.context.recorded_messages(self._logfile) as filename:
+
+ self.message(MessageType.START, self.action_name, logfile=filename)
try:
# Try the task action
@@ -545,8 +492,8 @@ class Job():
#
def _child_message_handler(self, message, context):
- # Log first
- message = self.child_log(message)
+ message.action_name = self.action_name
+ message.task_id = self._task_id
if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
# Job will be retried, display failures as warnings in the frontend