diff options
Diffstat (limited to 'src/buildstream/_messenger.py')
-rw-r--r-- | src/buildstream/_messenger.py | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py new file mode 100644 index 000000000..7dec93994 --- /dev/null +++ b/src/buildstream/_messenger.py @@ -0,0 +1,285 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Angelos Evripiotis <jevripiotis@bloomberg.net> + +import os +import datetime +from contextlib import contextmanager + +from . import _signals +from . import utils +from ._exceptions import BstError +from ._message import Message, MessageType +from .plugin import Plugin + + +class Messenger(): + + def __init__(self): + self._message_handler = None + self._silence_scope_depth = 0 + self._log_handle = None + self._log_filename = None + + # set_message_handler() + # + # Sets the handler for any status messages propagated through + # the context. + # + # The handler should have the signature: + # + # def handler( + # message: _message.Message, # The message to send. + # is_silenced: bool, # Whether messages are currently being silenced. + # ) -> None + # + def set_message_handler(self, handler): + self._message_handler = handler + + # _silent_messages(): + # + # Returns: + # (bool): Whether messages are currently being silenced + # + def _silent_messages(self): + return self._silence_scope_depth > 0 + + # message(): + # + # Proxies a message back to the caller, this is the central + # point through which all messages pass. + # + # Args: + # message: A Message object + # + def message(self, message): + + # If we are recording messages, dump a copy into the open log file. + self._record_message(message) + + # Send it off to the log handler (can be the frontend, + # or it can be the child task which will propagate + # to the frontend) + assert self._message_handler + + self._message_handler(message, is_silenced=self._silent_messages()) + + # silence() + # + # A context manager to silence messages, this behaves in + # the same way as the `silent_nested` argument of the + # timed_activity() context manager: all but + # _message.unconditional_messages will be silenced. + # + # Args: + # actually_silence (bool): Whether to actually do the silencing, if + # False then this context manager does not + # affect anything. + # + @contextmanager + def silence(self, *, actually_silence=True): + if not actually_silence: + yield + return + + self._silence_scope_depth += 1 + try: + yield + finally: + assert self._silence_scope_depth > 0 + self._silence_scope_depth -= 1 + + # timed_activity() + # + # Context manager for performing timed activities and logging those + # + # Args: + # activity_name (str): The name of the activity + # context (Context): The invocation context object + # unique_id (int): Optionally, the unique id of the plugin related to the message + # detail (str): An optional detailed message, can be multiline output + # silent_nested (bool): If True, all but _message.unconditional_messages are silenced + # + @contextmanager + def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False): + + starttime = datetime.datetime.now() + stopped_time = None + + def stop_time(): + nonlocal stopped_time + stopped_time = datetime.datetime.now() + + def resume_time(): + nonlocal stopped_time + nonlocal starttime + sleep_time = datetime.datetime.now() - stopped_time + starttime += sleep_time + + with _signals.suspendable(stop_time, resume_time): + try: + # Push activity depth for status messages + message = Message(unique_id, MessageType.START, activity_name, detail=detail) + self.message(message) + with self.silence(actually_silence=silent_nested): + yield + + except BstError: + # Note the failure in status messages and reraise, the scheduler + # expects an error when there is an error. + elapsed = datetime.datetime.now() - starttime + message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed) + self.message(message) + raise + + elapsed = datetime.datetime.now() - starttime + message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed) + self.message(message) + + # recorded_messages() + # + # Records all messages in a log file while the context manager + # is active. + # + # In addition to automatically writing all messages to the + # specified logging file, an open file handle for process stdout + # and stderr will be available via the Messenger.get_log_handle() API, + # and the full logfile path will be available via the + # Messenger.get_log_filename() API. + # + # Args: + # filename (str): A logging directory relative filename, + # the pid and .log extension will be automatically + # appended + # + # logdir (str) : The path to the log file directory. + # + # Yields: + # (str): The fully qualified log filename + # + @contextmanager + def recorded_messages(self, filename, logdir): + + # We dont allow recursing in this context manager, and + # we also do not allow it in the main process. + assert self._log_handle is None + assert self._log_filename is None + assert not utils._is_main_process() + + # Create the fully qualified logfile in the log directory, + # appending the pid and .log extension at the end. + self._log_filename = os.path.join(logdir, + '{}.{}.log'.format(filename, os.getpid())) + + # Ensure the directory exists first + directory = os.path.dirname(self._log_filename) + os.makedirs(directory, exist_ok=True) + + with open(self._log_filename, 'a') as logfile: + + # Write one last line to the log and flush it to disk + def flush_log(): + + # If the process currently had something happening in the I/O stack + # then trying to reenter the I/O stack will fire a runtime error. + # + # So just try to flush as well as we can at SIGTERM time + try: + logfile.write('\n\nForcefully terminated\n') + logfile.flush() + except RuntimeError: + os.fsync(logfile.fileno()) + + self._log_handle = logfile + with _signals.terminator(flush_log): + yield self._log_filename + + self._log_handle = None + self._log_filename = None + + # get_log_handle() + # + # Fetches the active log handle, this will return the active + # log file handle when the Messenger.recorded_messages() context + # manager is active + # + # Returns: + # (file): The active logging file handle, or None + # + def get_log_handle(self): + return self._log_handle + + # get_log_filename() + # + # Fetches the active log filename, this will return the active + # log filename when the Messenger.recorded_messages() context + # manager is active + # + # Returns: + # (str): The active logging filename, or None + # + def get_log_filename(self): + return self._log_filename + + # _record_message() + # + # Records the message if recording is enabled + # + # Args: + # message (Message): The message to record + # + def _record_message(self, message): + + if self._log_handle is None: + return + + INDENT = " " + EMPTYTIME = "--:--:--" + template = "[{timecode: <8}] {type: <7}" + + # If this message is associated with a plugin, print what + # we know about the plugin. + plugin_name = "" + if message.unique_id: + template += " {plugin}" + plugin = Plugin._lookup(message.unique_id) + plugin_name = plugin.name + + template += ": {message}" + + detail = '' + if message.detail is not None: + template += "\n\n{detail}" + detail = message.detail.rstrip('\n') + detail = INDENT + INDENT.join(detail.splitlines(True)) + + timecode = EMPTYTIME + if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): + hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) + minutes, seconds = divmod(remainder, 60) + timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) + + text = template.format(timecode=timecode, + plugin=plugin_name, + type=message.message_type.upper(), + message=message.message, + detail=detail) + + # Write to the open log file + self._log_handle.write('{}\n'.format(text)) + self._log_handle.flush() |