diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-07-18 18:15:07 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-07-18 18:59:03 +0900 |
commit | d835c37f2514767b64f6fa5eee0821ec1943a7fd (patch) | |
tree | 22d11a25ac1bd6374fa62545395b836c5a9bfd80 /buildstream/_scheduler/jobs/job.py | |
parent | a950a985e0fa05cf4f7b1b1bbc1de6db338e821c (diff) | |
download | buildstream-d835c37f2514767b64f6fa5eee0821ec1943a7fd.tar.gz |
Refactor: Use new logging mechanism from Contexttristan/local-cache-expiry
o element.py / plugin.py: Removed supporting logging code, and derive
the log handle from Context.
o _scheduler/scheduler.py, _scheduler/queues/queue.py: Adapt to new Job initializer API for the logfile
o _scheduler/jobs/job.py: Run job activities within the new context manager
which turns on logging for a job. Also removed a lot
of custom logging abstract methods which are unneeded.
o _scheduler/jobs: Job implementations need not implement so much custom logging.
Diffstat (limited to 'buildstream/_scheduler/jobs/job.py')
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 113 |
1 files changed, 30 insertions, 83 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index dec3a0c11..6d4b685af 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -27,13 +27,12 @@ import datetime import traceback import asyncio import multiprocessing -from contextlib import contextmanager import psutil # BuildStream toplevel imports from ..._exceptions import ImplError, BstError, set_last_task_error -from ..._message import MessageType, unconditional_messages +from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils @@ -113,6 +112,7 @@ class Job(): self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs self._logfile = logfile + self._task_id = None # spawn() # @@ -247,6 +247,24 @@ class Job(): os.kill(self._process.pid, signal.SIGCONT) self._suspended = False + # set_task_id() + # + # This is called by Job subclasses to set a plugin ID + # associated with the task at large (if any element is related + # to the task). + # + # The task ID helps keep messages in the frontend coherent + # in the case that multiple plugins log in the context of + # a single task (e.g. running integration commands should appear + # in the frontend for the element being built, not the element + # running the integration commands). + # + # Args: + # (int): The plugin identifier for this task + # + def set_task_id(self, task_id): + self._task_id = task_id + ####################################################### # Abstract Methods # ####################################################### @@ -277,28 +295,10 @@ class Job(): raise ImplError("Job '{kind}' does not implement child_process()" .format(kind=type(self).__name__)) - # child_logging_enabled() - # - # Start the log for this job. This function will be given a - # template string for the path to a log file - this will contain - # "{pid}", which should be replaced with the current process' - # PID. (i.e., call something like `logfile.format(pid=os.getpid())`). - # - # Args: - # logfile (str): A template string that points to the logfile - # that should be used - replace {pid} first. - # - # Yields: - # (str) The path to the logfile with {pid} replaced. - # - @contextmanager - def child_logging_enabled(self, logfile): - raise ImplError("Job '{kind}' does not implement child_logging_enabled()" - .format(kind=type(self).__name__)) - # message(): # - # Sends a message to the frontend + # Logs a message, this will be logged in the task's logfile and + # conditionally also be sent to the frontend. # # Args: # message_type (MessageType): The type of message to send @@ -306,8 +306,9 @@ class Job(): # kwargs: Remaining Message() constructor arguments # def message(self, message_type, message, **kwargs): - raise ImplError("Job '{kind}' does not implement message()" - .format(kind=type(self).__name__)) + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message(Message(None, message_type, message, **args)) # child_process_data() # @@ -323,24 +324,6 @@ class Job(): def child_process_data(self): return {} - # child_log() - # - # Log a message returned by the frontend's main message handler - # and return it to the main process. - # - # This method is also expected to add process-specific information - # to the message (notably, action_name and task_id). - # - # Arguments: - # message (str): The message to log - # - # Returns: - # message (Message): A message object - # - def child_log(self, message): - raise ImplError("Job '{kind}' does not implement child_log()" - .format(kind=type(self).__name__)) - ####################################################### # Local Private Methods # ####################################################### @@ -353,42 +336,6 @@ class Job(): # ####################################################### - # decorate_message() - # - # Format a message so that it can be used nicely for logging - # purposes. This will prepend a time code and add other - # information to help determine what happened. - # - # Args: - # message (Message) - The message to create a text from. - # name (str) - A name for the executing context. - # - # Returns: - # (str) The text to log. - # - def decorate_message(self, message, name): - INDENT = " " - EMPTYTIME = "--:--:--" - template = "[{timecode: <8}] {type: <7} {name: <15}: {message}" - - detail = '' - if message.detail is not None: - template += "\n\n{detail}" - detail = message.detail.rstrip('\n') - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - return template.format(timecode=timecode, - type=message.message_type.upper(), - name=name, - message=message.message, - detail=detail) - # _child_action() # # Perform the action in the child process, this calls the action_cb. @@ -398,8 +345,6 @@ class Job(): # def _child_action(self, queue): - logfile = self._logfile - # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process os.setsid() @@ -434,7 +379,9 @@ class Job(): # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ - self.child_logging_enabled(logfile) as filename: + self._scheduler.context.recorded_messages(self._logfile) as filename: + + self.message(MessageType.START, self.action_name, logfile=filename) try: # Try the task action @@ -545,8 +492,8 @@ class Job(): # def _child_message_handler(self, message, context): - # Log first - message = self.child_log(message) + message.action_name = self.action_name + message.task_id = self._task_id if message.message_type == MessageType.FAIL and self._tries <= self._max_retries: # Job will be retried, display failures as warnings in the frontend |