diff options
Diffstat (limited to 'src/buildstream/_context.py')
-rw-r--r-- | src/buildstream/_context.py | 266 |
1 files changed, 4 insertions, 262 deletions
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index c97acc70e..52e4c3db9 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -19,22 +19,17 @@ import os import shutil -import datetime -from collections import deque -from contextlib import contextmanager from . import utils from . import _cachekey -from . import _signals from . import _site from . import _yaml -from ._exceptions import LoadError, LoadErrorReason, BstError -from ._message import Message, MessageType +from ._exceptions import LoadError, LoadErrorReason +from ._messenger import Messenger from ._profile import Topics, PROFILER from ._artifactcache import ArtifactCache from ._sourcecache import SourceCache from ._cas import CASCache, CASQuota, CASCacheUsage from ._workspaces import Workspaces, WorkspaceProjectCache -from .plugin import Plugin from .sandbox import SandboxRemote @@ -149,18 +144,16 @@ class Context(): # Make sure the XDG vars are set in the environment before loading anything self._init_xdg() + self.messenger = Messenger() + # Private variables self._cache_key = None - self._message_handler = None - self._message_depth = deque() self._artifactcache = None self._sourcecache = None self._projects = [] self._project_overrides = _yaml.new_empty_node() self._workspaces = None self._workspace_project_cache = WorkspaceProjectCache() - self._log_handle = None - self._log_filename = None self._cascache = None self._casquota = None @@ -440,197 +433,6 @@ class Context(): return self._cache_key - # set_message_handler() - # - # Sets the handler for any status messages propagated through - # the context. - # - # The message handler should have the same signature as - # the message() method - def set_message_handler(self, handler): - self._message_handler = handler - - # silent_messages(): - # - # Returns: - # (bool): Whether messages are currently being silenced - # - def silent_messages(self): - for silent in self._message_depth: - if silent: - return True - return False - - # message(): - # - # Proxies a message back to the caller, this is the central - # point through which all messages pass. - # - # Args: - # message: A Message object - # - def message(self, message): - - # Tag message only once - if message.depth is None: - message.depth = len(list(self._message_depth)) - - # If we are recording messages, dump a copy into the open log file. - self._record_message(message) - - # Send it off to the log handler (can be the frontend, - # or it can be the child task which will propagate - # to the frontend) - assert self._message_handler - - self._message_handler(message, context=self) - - # silence() - # - # A context manager to silence messages, this behaves in - # the same way as the `silent_nested` argument of the - # Context._timed_activity() context manager: especially - # important messages will not be silenced. - # - @contextmanager - def silence(self): - self._push_message_depth(True) - try: - yield - finally: - self._pop_message_depth() - - # timed_activity() - # - # Context manager for performing timed activities and logging those - # - # Args: - # context (Context): The invocation context object - # activity_name (str): The name of the activity - # detail (str): An optional detailed message, can be multiline output - # silent_nested (bool): If specified, nested messages will be silenced - # - @contextmanager - def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False): - - starttime = datetime.datetime.now() - stopped_time = None - - def stop_time(): - nonlocal stopped_time - stopped_time = datetime.datetime.now() - - def resume_time(): - nonlocal stopped_time - nonlocal starttime - sleep_time = datetime.datetime.now() - stopped_time - starttime += sleep_time - - with _signals.suspendable(stop_time, resume_time): - try: - # Push activity depth for status messages - message = Message(unique_id, MessageType.START, activity_name, detail=detail) - self.message(message) - self._push_message_depth(silent_nested) - yield - - except BstError: - # Note the failure in status messages and reraise, the scheduler - # expects an error when there is an error. - elapsed = datetime.datetime.now() - starttime - message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed) - self._pop_message_depth() - self.message(message) - raise - - elapsed = datetime.datetime.now() - starttime - message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed) - self._pop_message_depth() - self.message(message) - - # recorded_messages() - # - # Records all messages in a log file while the context manager - # is active. - # - # In addition to automatically writing all messages to the - # specified logging file, an open file handle for process stdout - # and stderr will be available via the Context.get_log_handle() API, - # and the full logfile path will be available via the - # Context.get_log_filename() API. - # - # Args: - # filename (str): A logging directory relative filename, - # the pid and .log extension will be automatically - # appended - # - # Yields: - # (str): The fully qualified log filename - # - @contextmanager - def recorded_messages(self, filename): - - # We dont allow recursing in this context manager, and - # we also do not allow it in the main process. - assert self._log_handle is None - assert self._log_filename is None - assert not utils._is_main_process() - - # Create the fully qualified logfile in the log directory, - # appending the pid and .log extension at the end. - self._log_filename = os.path.join(self.logdir, - '{}.{}.log'.format(filename, os.getpid())) - - # Ensure the directory exists first - directory = os.path.dirname(self._log_filename) - os.makedirs(directory, exist_ok=True) - - with open(self._log_filename, 'a') as logfile: - - # Write one last line to the log and flush it to disk - def flush_log(): - - # If the process currently had something happening in the I/O stack - # then trying to reenter the I/O stack will fire a runtime error. - # - # So just try to flush as well as we can at SIGTERM time - try: - logfile.write('\n\nForcefully terminated\n') - logfile.flush() - except RuntimeError: - os.fsync(logfile.fileno()) - - self._log_handle = logfile - with _signals.terminator(flush_log): - yield self._log_filename - - self._log_handle = None - self._log_filename = None - - # get_log_handle() - # - # Fetches the active log handle, this will return the active - # log file handle when the Context.recorded_messages() context - # manager is active - # - # Returns: - # (file): The active logging file handle, or None - # - def get_log_handle(self): - return self._log_handle - - # get_log_filename() - # - # Fetches the active log filename, this will return the active - # log filename when the Context.recorded_messages() context - # manager is active - # - # Returns: - # (str): The active logging filename, or None - # - def get_log_filename(self): - return self._log_filename - # set_artifact_directories_optional() # # This indicates that the current context (command or configuration) @@ -650,66 +452,6 @@ class Context(): def set_artifact_files_optional(self): self.require_artifact_files = False - # _record_message() - # - # Records the message if recording is enabled - # - # Args: - # message (Message): The message to record - # - def _record_message(self, message): - - if self._log_handle is None: - return - - INDENT = " " - EMPTYTIME = "--:--:--" - template = "[{timecode: <8}] {type: <7}" - - # If this message is associated with a plugin, print what - # we know about the plugin. - plugin_name = "" - if message.unique_id: - template += " {plugin}" - plugin = Plugin._lookup(message.unique_id) - plugin_name = plugin.name - - template += ": {message}" - - detail = '' - if message.detail is not None: - template += "\n\n{detail}" - detail = message.detail.rstrip('\n') - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - text = template.format(timecode=timecode, - plugin=plugin_name, - type=message.message_type.upper(), - message=message.message, - detail=detail) - - # Write to the open log file - self._log_handle.write('{}\n'.format(text)) - self._log_handle.flush() - - # _push_message_depth() / _pop_message_depth() - # - # For status messages, send the depth of timed - # activities inside a given task through the message - # - def _push_message_depth(self, silent_nested): - self._message_depth.appendleft(silent_nested) - - def _pop_message_depth(self): - assert self._message_depth - self._message_depth.popleft() - # Force the resolved XDG variables into the environment, # this is so that they can be used directly to specify # preferred locations of things from user configuration |