summaryrefslogtreecommitdiff
path: root/src/buildstream/_messenger.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_messenger.py')
-rw-r--r--src/buildstream/_messenger.py285
1 files changed, 285 insertions, 0 deletions
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
new file mode 100644
index 000000000..7dec93994
--- /dev/null
+++ b/src/buildstream/_messenger.py
@@ -0,0 +1,285 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Angelos Evripiotis <jevripiotis@bloomberg.net>
+
+import os
+import datetime
+from contextlib import contextmanager
+
+from . import _signals
+from . import utils
+from ._exceptions import BstError
+from ._message import Message, MessageType
+from .plugin import Plugin
+
+
+class Messenger():
+
+ def __init__(self):
+ self._message_handler = None
+ self._silence_scope_depth = 0
+ self._log_handle = None
+ self._log_filename = None
+
+ # set_message_handler()
+ #
+ # Sets the handler for any status messages propagated through
+ # the context.
+ #
+ # The handler should have the signature:
+ #
+ # def handler(
+ # message: _message.Message, # The message to send.
+ # is_silenced: bool, # Whether messages are currently being silenced.
+ # ) -> None
+ #
+ def set_message_handler(self, handler):
+ self._message_handler = handler
+
+ # _silent_messages():
+ #
+ # Returns:
+ # (bool): Whether messages are currently being silenced
+ #
+ def _silent_messages(self):
+ return self._silence_scope_depth > 0
+
+ # message():
+ #
+ # Proxies a message back to the caller, this is the central
+ # point through which all messages pass.
+ #
+ # Args:
+ # message: A Message object
+ #
+ def message(self, message):
+
+ # If we are recording messages, dump a copy into the open log file.
+ self._record_message(message)
+
+ # Send it off to the log handler (can be the frontend,
+ # or it can be the child task which will propagate
+ # to the frontend)
+ assert self._message_handler
+
+ self._message_handler(message, is_silenced=self._silent_messages())
+
+ # silence()
+ #
+ # A context manager to silence messages, this behaves in
+ # the same way as the `silent_nested` argument of the
+ # timed_activity() context manager: all but
+ # _message.unconditional_messages will be silenced.
+ #
+ # Args:
+ # actually_silence (bool): Whether to actually do the silencing, if
+ # False then this context manager does not
+ # affect anything.
+ #
+ @contextmanager
+ def silence(self, *, actually_silence=True):
+ if not actually_silence:
+ yield
+ return
+
+ self._silence_scope_depth += 1
+ try:
+ yield
+ finally:
+ assert self._silence_scope_depth > 0
+ self._silence_scope_depth -= 1
+
+ # timed_activity()
+ #
+ # Context manager for performing timed activities and logging those
+ #
+ # Args:
+ # activity_name (str): The name of the activity
+ # context (Context): The invocation context object
+ # unique_id (int): Optionally, the unique id of the plugin related to the message
+ # detail (str): An optional detailed message, can be multiline output
+ # silent_nested (bool): If True, all but _message.unconditional_messages are silenced
+ #
+ @contextmanager
+ def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
+
+ starttime = datetime.datetime.now()
+ stopped_time = None
+
+ def stop_time():
+ nonlocal stopped_time
+ stopped_time = datetime.datetime.now()
+
+ def resume_time():
+ nonlocal stopped_time
+ nonlocal starttime
+ sleep_time = datetime.datetime.now() - stopped_time
+ starttime += sleep_time
+
+ with _signals.suspendable(stop_time, resume_time):
+ try:
+ # Push activity depth for status messages
+ message = Message(unique_id, MessageType.START, activity_name, detail=detail)
+ self.message(message)
+ with self.silence(actually_silence=silent_nested):
+ yield
+
+ except BstError:
+ # Note the failure in status messages and reraise, the scheduler
+ # expects an error when there is an error.
+ elapsed = datetime.datetime.now() - starttime
+ message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
+ self.message(message)
+ raise
+
+ elapsed = datetime.datetime.now() - starttime
+ message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
+ self.message(message)
+
+ # recorded_messages()
+ #
+ # Records all messages in a log file while the context manager
+ # is active.
+ #
+ # In addition to automatically writing all messages to the
+ # specified logging file, an open file handle for process stdout
+ # and stderr will be available via the Messenger.get_log_handle() API,
+ # and the full logfile path will be available via the
+ # Messenger.get_log_filename() API.
+ #
+ # Args:
+ # filename (str): A logging directory relative filename,
+ # the pid and .log extension will be automatically
+ # appended
+ #
+ # logdir (str) : The path to the log file directory.
+ #
+ # Yields:
+ # (str): The fully qualified log filename
+ #
+ @contextmanager
+ def recorded_messages(self, filename, logdir):
+
+ # We dont allow recursing in this context manager, and
+ # we also do not allow it in the main process.
+ assert self._log_handle is None
+ assert self._log_filename is None
+ assert not utils._is_main_process()
+
+ # Create the fully qualified logfile in the log directory,
+ # appending the pid and .log extension at the end.
+ self._log_filename = os.path.join(logdir,
+ '{}.{}.log'.format(filename, os.getpid()))
+
+ # Ensure the directory exists first
+ directory = os.path.dirname(self._log_filename)
+ os.makedirs(directory, exist_ok=True)
+
+ with open(self._log_filename, 'a') as logfile:
+
+ # Write one last line to the log and flush it to disk
+ def flush_log():
+
+ # If the process currently had something happening in the I/O stack
+ # then trying to reenter the I/O stack will fire a runtime error.
+ #
+ # So just try to flush as well as we can at SIGTERM time
+ try:
+ logfile.write('\n\nForcefully terminated\n')
+ logfile.flush()
+ except RuntimeError:
+ os.fsync(logfile.fileno())
+
+ self._log_handle = logfile
+ with _signals.terminator(flush_log):
+ yield self._log_filename
+
+ self._log_handle = None
+ self._log_filename = None
+
+ # get_log_handle()
+ #
+ # Fetches the active log handle, this will return the active
+ # log file handle when the Messenger.recorded_messages() context
+ # manager is active
+ #
+ # Returns:
+ # (file): The active logging file handle, or None
+ #
+ def get_log_handle(self):
+ return self._log_handle
+
+ # get_log_filename()
+ #
+ # Fetches the active log filename, this will return the active
+ # log filename when the Messenger.recorded_messages() context
+ # manager is active
+ #
+ # Returns:
+ # (str): The active logging filename, or None
+ #
+ def get_log_filename(self):
+ return self._log_filename
+
+ # _record_message()
+ #
+ # Records the message if recording is enabled
+ #
+ # Args:
+ # message (Message): The message to record
+ #
+ def _record_message(self, message):
+
+ if self._log_handle is None:
+ return
+
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+ template = "[{timecode: <8}] {type: <7}"
+
+ # If this message is associated with a plugin, print what
+ # we know about the plugin.
+ plugin_name = ""
+ if message.unique_id:
+ template += " {plugin}"
+ plugin = Plugin._lookup(message.unique_id)
+ plugin_name = plugin.name
+
+ template += ": {message}"
+
+ detail = ''
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip('\n')
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ text = template.format(timecode=timecode,
+ plugin=plugin_name,
+ type=message.message_type.upper(),
+ message=message.message,
+ detail=detail)
+
+ # Write to the open log file
+ self._log_handle.write('{}\n'.format(text))
+ self._log_handle.flush()