diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-07-18 18:15:07 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-07-18 18:59:03 +0900 |
commit | d835c37f2514767b64f6fa5eee0821ec1943a7fd (patch) | |
tree | 22d11a25ac1bd6374fa62545395b836c5a9bfd80 | |
parent | a950a985e0fa05cf4f7b1b1bbc1de6db338e821c (diff) | |
download | buildstream-tristan/local-cache-expiry.tar.gz |
Refactor: Use new logging mechanism from Contexttristan/local-cache-expiry
o element.py / plugin.py: Removed supporting logging code, and derive
the log handle from Context.
o _scheduler/scheduler.py, _scheduler/queues/queue.py: Adapt to new Job initializer API for the logfile
o _scheduler/jobs/job.py: Run job activities within the new context manager
which turns on logging for a job. Also removed a lot
of custom logging abstract methods which are unneeded.
o _scheduler/jobs: Job implementations need not implement so much custom logging.
-rw-r--r-- | buildstream/_scheduler/jobs/cachesizejob.py | 44 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cleanupjob.py | 25 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/elementjob.py | 75 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 113 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 6 | ||||
-rw-r--r-- | buildstream/element.py | 84 | ||||
-rw-r--r-- | buildstream/plugin.py | 17 |
8 files changed, 55 insertions, 318 deletions
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index ffb945e43..f73a09c74 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -16,12 +16,8 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -import os -from contextlib import contextmanager - from .job import Job from ..._platform import Platform -from ..._message import Message, MessageType class CacheSizeJob(Job): @@ -38,45 +34,5 @@ class CacheSizeJob(Job): if self._complete_cb: self._complete_cb(result) - @contextmanager - def child_logging_enabled(self, logfile): - self._logfile = logfile.format(pid=os.getpid()) - yield self._logfile - self._logfile = None - - def message(self, message_type, message, **kwargs): - args = dict(kwargs) - args['scheduler'] = True - self._scheduler.context.message(Message(None, message_type, message, **args)) - - def child_log(self, message): - with open(self._logfile, 'a+') as log: - INDENT = " " - EMPTYTIME = "--:--:--" - - template = "[{timecode: <8}] {type: <7} {name: <15}: {message}" - detail = '' - if message.detail is not None: - template += "\n\n{detail}" - detail = message.detail.rstrip('\n') - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - message_text = template.format(timecode=timecode, - type=message.message_type.upper(), - name='cache_size', - message=message.message, - detail=detail) - - log.write('{}\n'.format(message_text)) - log.flush() - - return message - def child_process_data(self): return {} diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index af7ca2b0b..bb78e8751 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -16,12 +16,8 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -import os -from contextlib import contextmanager - from .job import Job from ..._platform import Platform -from ..._message import Message class CleanupJob(Job): @@ -38,26 +34,5 @@ class CleanupJob(Job): if self._complete_cb: self._complete_cb() - @contextmanager - def child_logging_enabled(self, logfile): - self._logfile = logfile.format(pid=os.getpid()) - yield self._logfile - self._logfile = None - - def message(self, message_type, message, **kwargs): - args = dict(kwargs) - args['scheduler'] = True - self._scheduler.context.message(Message(None, message_type, message, **args)) - - def child_log(self, message): - message.action_name = self.action_name - - with open(self._logfile, 'a+') as log: - message_text = self.decorate_message(message, '[cleanup]') - log.write('{}\n'.format(message_text)) - log.flush() - - return message - def child_process_data(self): return {} diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index 3dfb9aa6e..fcad20ce4 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -16,14 +16,9 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -import os -from contextlib import contextmanager - from ruamel import yaml from ..._message import Message, MessageType -from ...plugin import _plugin_lookup -from ... import _signals from .job import Job @@ -77,44 +72,30 @@ class ElementJob(Job): self._action_cb = action_cb # The action callable function self._complete_cb = complete_cb # The complete callable function + # Set the task wide ID for logging purposes + self.set_task_id(element._get_unique_id()) + @property def element(self): return self._element def child_process(self): + + # Print the element's environment at the beginning of any element's log file. + # + # This should probably be omitted for non-build tasks but it's harmless here + elt_env = self._element.get_environment() + env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) + self.message(MessageType.LOG, + "Build environment for element {}".format(self._element.name), + detail=env_dump) + + # Run the action return self._action_cb(self._element) def parent_complete(self, success, result): self._complete_cb(self, self._element, success, self._result) - @contextmanager - def child_logging_enabled(self, logfile): - self._logfile = logfile.format(pid=os.getpid()) - - with open(self._logfile, 'a') as log: - # Write one last line to the log and flush it to disk - def flush_log(): - - # If the process currently had something happening in the I/O stack - # then trying to reenter the I/O stack will fire a runtime error. - # - # So just try to flush as well as we can at SIGTERM time - try: - # FIXME: Better logging - - log.write('\n\nAction {} for element {} forcefully terminated\n' - .format(self.action_name, self._element.name)) - log.flush() - except RuntimeError: - os.fsync(log.fileno()) - - self._element._set_log_handle(log) - with _signals.terminator(flush_log): - self._print_start_message(self._element, self._logfile) - yield self._logfile - self._element._set_log_handle(None) - self._logfile = None - def message(self, message_type, message, **kwargs): args = dict(kwargs) args['scheduler'] = True @@ -124,34 +105,6 @@ class ElementJob(Job): message, **args)) - def _print_start_message(self, element, logfile): - self.message(MessageType.START, self.action_name, logfile=logfile) - - # Print the element's environment at the beginning of any element's log file. - # - # This should probably be omitted for non-build tasks but it's harmless here - elt_env = element.get_environment() - env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) - self.message(MessageType.LOG, - "Build environment for element {}".format(element.name), - detail=env_dump, logfile=logfile) - - def child_log(self, message): - # Tag them on the way out the door... - message.action_name = self.action_name - message.task_id = self._element._get_unique_id() - - # Use the plugin for the task for the output, not a plugin - # which might be acting on behalf of the task - plugin = _plugin_lookup(message.task_id) - - with plugin._output_file() as output: - message_text = self.decorate_message(message, '[{}]'.format(plugin.name)) - output.write('{}\n'.format(message_text)) - output.flush() - - return message - def child_process_data(self): data = {} diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index dec3a0c11..6d4b685af 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -27,13 +27,12 @@ import datetime import traceback import asyncio import multiprocessing -from contextlib import contextmanager import psutil # BuildStream toplevel imports from ..._exceptions import ImplError, BstError, set_last_task_error -from ..._message import MessageType, unconditional_messages +from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils @@ -113,6 +112,7 @@ class Job(): self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs self._logfile = logfile + self._task_id = None # spawn() # @@ -247,6 +247,24 @@ class Job(): os.kill(self._process.pid, signal.SIGCONT) self._suspended = False + # set_task_id() + # + # This is called by Job subclasses to set a plugin ID + # associated with the task at large (if any element is related + # to the task). + # + # The task ID helps keep messages in the frontend coherent + # in the case that multiple plugins log in the context of + # a single task (e.g. running integration commands should appear + # in the frontend for the element being built, not the element + # running the integration commands). + # + # Args: + # (int): The plugin identifier for this task + # + def set_task_id(self, task_id): + self._task_id = task_id + ####################################################### # Abstract Methods # ####################################################### @@ -277,28 +295,10 @@ class Job(): raise ImplError("Job '{kind}' does not implement child_process()" .format(kind=type(self).__name__)) - # child_logging_enabled() - # - # Start the log for this job. This function will be given a - # template string for the path to a log file - this will contain - # "{pid}", which should be replaced with the current process' - # PID. (i.e., call something like `logfile.format(pid=os.getpid())`). - # - # Args: - # logfile (str): A template string that points to the logfile - # that should be used - replace {pid} first. - # - # Yields: - # (str) The path to the logfile with {pid} replaced. - # - @contextmanager - def child_logging_enabled(self, logfile): - raise ImplError("Job '{kind}' does not implement child_logging_enabled()" - .format(kind=type(self).__name__)) - # message(): # - # Sends a message to the frontend + # Logs a message, this will be logged in the task's logfile and + # conditionally also be sent to the frontend. # # Args: # message_type (MessageType): The type of message to send @@ -306,8 +306,9 @@ class Job(): # kwargs: Remaining Message() constructor arguments # def message(self, message_type, message, **kwargs): - raise ImplError("Job '{kind}' does not implement message()" - .format(kind=type(self).__name__)) + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message(Message(None, message_type, message, **args)) # child_process_data() # @@ -323,24 +324,6 @@ class Job(): def child_process_data(self): return {} - # child_log() - # - # Log a message returned by the frontend's main message handler - # and return it to the main process. - # - # This method is also expected to add process-specific information - # to the message (notably, action_name and task_id). - # - # Arguments: - # message (str): The message to log - # - # Returns: - # message (Message): A message object - # - def child_log(self, message): - raise ImplError("Job '{kind}' does not implement child_log()" - .format(kind=type(self).__name__)) - ####################################################### # Local Private Methods # ####################################################### @@ -353,42 +336,6 @@ class Job(): # ####################################################### - # decorate_message() - # - # Format a message so that it can be used nicely for logging - # purposes. This will prepend a time code and add other - # information to help determine what happened. - # - # Args: - # message (Message) - The message to create a text from. - # name (str) - A name for the executing context. - # - # Returns: - # (str) The text to log. - # - def decorate_message(self, message, name): - INDENT = " " - EMPTYTIME = "--:--:--" - template = "[{timecode: <8}] {type: <7} {name: <15}: {message}" - - detail = '' - if message.detail is not None: - template += "\n\n{detail}" - detail = message.detail.rstrip('\n') - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - return template.format(timecode=timecode, - type=message.message_type.upper(), - name=name, - message=message.message, - detail=detail) - # _child_action() # # Perform the action in the child process, this calls the action_cb. @@ -398,8 +345,6 @@ class Job(): # def _child_action(self, queue): - logfile = self._logfile - # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process os.setsid() @@ -434,7 +379,9 @@ class Job(): # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ - self.child_logging_enabled(logfile) as filename: + self._scheduler.context.recorded_messages(self._logfile) as filename: + + self.message(MessageType.START, self.action_name, logfile=filename) try: # Try the task action @@ -545,8 +492,8 @@ class Job(): # def _child_message_handler(self, message, context): - # Log first - message = self.child_log(message) + message.action_name = self.action_name + message.task_id = self._task_id if message.message_type == MessageType.FAIL and self._tries <= self._max_retries: # Job will be retried, display failures as warnings in the frontend diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index ac20d3711..6c1583495 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -353,13 +353,8 @@ class Queue(): def _element_log_path(self, element): project = element._get_project() - context = element._get_context() - key = element._get_display_key()[1] action = self.action_name.lower() - logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action) - - directory = os.path.join(context.logdir, project.name, element.normal_name) + logfile = "{key}-{action}".format(key=key, action=action) - os.makedirs(directory, exist_ok=True) - return os.path.join(directory, logfile) + return os.path.join(project.name, element.normal_name, logfile) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index aeb32931a..5783e5a67 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -317,8 +317,7 @@ class Scheduler(): if cache_size and cache_size < self.context.cache_quota: return - logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log') - job = CleanupJob(self, 'cleanup', logpath, + job = CleanupJob(self, 'cleanup', 'cleanup', resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE], @@ -326,8 +325,7 @@ class Scheduler(): self.schedule_jobs([job]) def _check_cache_size_real(self): - logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log') - job = CacheSizeJob(self, 'cache_size', logpath, + job = CacheSizeJob(self, 'cache_size', 'cache_size', resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE], diff --git a/buildstream/element.py b/buildstream/element.py index 140c824ec..ca06967a2 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -219,7 +219,6 @@ class Element(Plugin): self.__tracking_scheduled = False # Sources are scheduled to be tracked self.__tracking_done = False # Sources have been tracked self.__pull_done = False # Whether pull was attempted - self.__log_path = None # Path to dedicated log file or None self.__splits = None # Resolved regex objects for computing split domains self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap self.__staged_sources_directory = None # Location where Element.stage_sources() was called @@ -1501,8 +1500,9 @@ class Element(Plugin): utils.link_files(collectdir, filesdir) # Copy build log - if self.__log_path: - shutil.copyfile(self.__log_path, os.path.join(logsdir, 'build.log')) + log_filename = context.get_log_filename() + if log_filename: + shutil.copyfile(log_filename, os.path.join(logsdir, 'build.log')) # Store public data _yaml.dump(_yaml.node_sanitize(self.__dynamic_public), os.path.join(metadir, 'public.yaml')) @@ -1837,47 +1837,6 @@ class Element(Plugin): def _subst_string(self, value): return self.__variables.subst(value) - # Run some element methods with logging directed to - # a dedicated log file, here we yield the filename - # we decided on for logging - # - @contextmanager - def _logging_enabled(self, action_name): - self.__log_path = self.__logfile(action_name) - with open(self.__log_path, 'a') as logfile: - - # Write one last line to the log and flush it to disk - def flush_log(): - - # If the process currently had something happening in the I/O stack - # then trying to reenter the I/O stack will fire a runtime error. - # - # So just try to flush as well as we can at SIGTERM time - try: - logfile.write('\n\nAction {} for element {} forcefully terminated\n' - .format(action_name, self.name)) - logfile.flush() - except RuntimeError: - os.fsync(logfile.fileno()) - - self._set_log_handle(logfile) - with _signals.terminator(flush_log): - yield self.__log_path - self._set_log_handle(None) - self.__log_path = None - - # Override plugin _set_log_handle(), set it for our sources and dependencies too - # - # A log handle is set once in the context of a child task which will have only - # one log, so it's not harmful to modify the state of dependencies - def _set_log_handle(self, logfile, recurse=True): - super()._set_log_handle(logfile) - for source in self.sources(): - source._set_log_handle(logfile) - if recurse: - for dep in self.dependencies(Scope.ALL): - dep._set_log_handle(logfile, False) - # Returns the element whose sources this element is ultimately derived from. # # This is intended for being used to redirect commands that operate on an @@ -2015,43 +1974,6 @@ class Element(Plugin): if workspace: workspace.prepared = True - # __logfile() - # - # Compose the log file for this action & pid. - # - # Args: - # action_name (str): The action name - # pid (int): Optional pid, current pid is assumed if not provided. - # - # Returns: - # (string): The log file full path - # - # Log file format, when there is a cache key, is: - # - # '{logdir}/{project}/{element}/{cachekey}-{action}.{pid}.log' - # - # Otherwise, it is: - # - # '{logdir}/{project}/{element}/{:0<64}-{action}.{pid}.log' - # - # This matches the order in which things are stored in the artifact cache - # - def __logfile(self, action_name, pid=None): - project = self._get_project() - context = self._get_context() - key = self.__get_brief_display_key() - if pid is None: - pid = os.getpid() - - action = action_name.lower() - logfile = "{key}-{action}.{pid}.log".format( - key=key, action=action, pid=pid) - - directory = os.path.join(context.logdir, project.name, self.normal_name) - - os.makedirs(directory, exist_ok=True) - return os.path.join(directory, logfile) - # __assert_cached() # # Raises an error if the artifact is not cached. diff --git a/buildstream/plugin.py b/buildstream/plugin.py index 29fe2cb11..3e1d89052 100644 --- a/buildstream/plugin.py +++ b/buildstream/plugin.py @@ -162,7 +162,6 @@ class Plugin(): self.__provenance = provenance # The Provenance information self.__type_tag = type_tag # The type of plugin (element or source) self.__unique_id = _plugin_register(self) # Unique ID - self.__log = None # The log handle when running a task # Infer the kind identifier modulename = type(self).__module__ @@ -474,6 +473,7 @@ class Plugin(): self.call(... command which takes time ...) """ with self.__context.timed_activity(activity_name, + unique_id=self.__unique_id, detail=detail, silent_nested=silent_nested): yield @@ -589,27 +589,18 @@ class Plugin(): def _get_provenance(self): return self.__provenance - # Accessor for logging handle - # - def _get_log_handle(self, log): - return self.__log - - # Mutator for logging handle - # - def _set_log_handle(self, log): - self.__log = log - # Context manager for getting the open file handle to this # plugin's log. Used in the child context to add stuff to # a log. # @contextmanager def _output_file(self): - if not self.__log: + log = self.__context.get_log_handle() + if log is None: with open(os.devnull, "w") as output: yield output else: - yield self.__log + yield log # _preflight(): # Calls preflight() for the plugin, and allows generic preflight |