summaryrefslogtreecommitdiff
path: root/src/buildstream/_context.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_context.py')
-rw-r--r--src/buildstream/_context.py266
1 files changed, 4 insertions, 262 deletions
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index c97acc70e..52e4c3db9 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -19,22 +19,17 @@
import os
import shutil
-import datetime
-from collections import deque
-from contextlib import contextmanager
from . import utils
from . import _cachekey
-from . import _signals
from . import _site
from . import _yaml
-from ._exceptions import LoadError, LoadErrorReason, BstError
-from ._message import Message, MessageType
+from ._exceptions import LoadError, LoadErrorReason
+from ._messenger import Messenger
from ._profile import Topics, PROFILER
from ._artifactcache import ArtifactCache
from ._sourcecache import SourceCache
from ._cas import CASCache, CASQuota, CASCacheUsage
from ._workspaces import Workspaces, WorkspaceProjectCache
-from .plugin import Plugin
from .sandbox import SandboxRemote
@@ -149,18 +144,16 @@ class Context():
# Make sure the XDG vars are set in the environment before loading anything
self._init_xdg()
+ self.messenger = Messenger()
+
# Private variables
self._cache_key = None
- self._message_handler = None
- self._message_depth = deque()
self._artifactcache = None
self._sourcecache = None
self._projects = []
self._project_overrides = _yaml.new_empty_node()
self._workspaces = None
self._workspace_project_cache = WorkspaceProjectCache()
- self._log_handle = None
- self._log_filename = None
self._cascache = None
self._casquota = None
@@ -440,197 +433,6 @@ class Context():
return self._cache_key
- # set_message_handler()
- #
- # Sets the handler for any status messages propagated through
- # the context.
- #
- # The message handler should have the same signature as
- # the message() method
- def set_message_handler(self, handler):
- self._message_handler = handler
-
- # silent_messages():
- #
- # Returns:
- # (bool): Whether messages are currently being silenced
- #
- def silent_messages(self):
- for silent in self._message_depth:
- if silent:
- return True
- return False
-
- # message():
- #
- # Proxies a message back to the caller, this is the central
- # point through which all messages pass.
- #
- # Args:
- # message: A Message object
- #
- def message(self, message):
-
- # Tag message only once
- if message.depth is None:
- message.depth = len(list(self._message_depth))
-
- # If we are recording messages, dump a copy into the open log file.
- self._record_message(message)
-
- # Send it off to the log handler (can be the frontend,
- # or it can be the child task which will propagate
- # to the frontend)
- assert self._message_handler
-
- self._message_handler(message, context=self)
-
- # silence()
- #
- # A context manager to silence messages, this behaves in
- # the same way as the `silent_nested` argument of the
- # Context._timed_activity() context manager: especially
- # important messages will not be silenced.
- #
- @contextmanager
- def silence(self):
- self._push_message_depth(True)
- try:
- yield
- finally:
- self._pop_message_depth()
-
- # timed_activity()
- #
- # Context manager for performing timed activities and logging those
- #
- # Args:
- # context (Context): The invocation context object
- # activity_name (str): The name of the activity
- # detail (str): An optional detailed message, can be multiline output
- # silent_nested (bool): If specified, nested messages will be silenced
- #
- @contextmanager
- def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
-
- starttime = datetime.datetime.now()
- stopped_time = None
-
- def stop_time():
- nonlocal stopped_time
- stopped_time = datetime.datetime.now()
-
- def resume_time():
- nonlocal stopped_time
- nonlocal starttime
- sleep_time = datetime.datetime.now() - stopped_time
- starttime += sleep_time
-
- with _signals.suspendable(stop_time, resume_time):
- try:
- # Push activity depth for status messages
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
- self.message(message)
- self._push_message_depth(silent_nested)
- yield
-
- except BstError:
- # Note the failure in status messages and reraise, the scheduler
- # expects an error when there is an error.
- elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
- self._pop_message_depth()
- self.message(message)
- raise
-
- elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
- self._pop_message_depth()
- self.message(message)
-
- # recorded_messages()
- #
- # Records all messages in a log file while the context manager
- # is active.
- #
- # In addition to automatically writing all messages to the
- # specified logging file, an open file handle for process stdout
- # and stderr will be available via the Context.get_log_handle() API,
- # and the full logfile path will be available via the
- # Context.get_log_filename() API.
- #
- # Args:
- # filename (str): A logging directory relative filename,
- # the pid and .log extension will be automatically
- # appended
- #
- # Yields:
- # (str): The fully qualified log filename
- #
- @contextmanager
- def recorded_messages(self, filename):
-
- # We dont allow recursing in this context manager, and
- # we also do not allow it in the main process.
- assert self._log_handle is None
- assert self._log_filename is None
- assert not utils._is_main_process()
-
- # Create the fully qualified logfile in the log directory,
- # appending the pid and .log extension at the end.
- self._log_filename = os.path.join(self.logdir,
- '{}.{}.log'.format(filename, os.getpid()))
-
- # Ensure the directory exists first
- directory = os.path.dirname(self._log_filename)
- os.makedirs(directory, exist_ok=True)
-
- with open(self._log_filename, 'a') as logfile:
-
- # Write one last line to the log and flush it to disk
- def flush_log():
-
- # If the process currently had something happening in the I/O stack
- # then trying to reenter the I/O stack will fire a runtime error.
- #
- # So just try to flush as well as we can at SIGTERM time
- try:
- logfile.write('\n\nForcefully terminated\n')
- logfile.flush()
- except RuntimeError:
- os.fsync(logfile.fileno())
-
- self._log_handle = logfile
- with _signals.terminator(flush_log):
- yield self._log_filename
-
- self._log_handle = None
- self._log_filename = None
-
- # get_log_handle()
- #
- # Fetches the active log handle, this will return the active
- # log file handle when the Context.recorded_messages() context
- # manager is active
- #
- # Returns:
- # (file): The active logging file handle, or None
- #
- def get_log_handle(self):
- return self._log_handle
-
- # get_log_filename()
- #
- # Fetches the active log filename, this will return the active
- # log filename when the Context.recorded_messages() context
- # manager is active
- #
- # Returns:
- # (str): The active logging filename, or None
- #
- def get_log_filename(self):
- return self._log_filename
-
# set_artifact_directories_optional()
#
# This indicates that the current context (command or configuration)
@@ -650,66 +452,6 @@ class Context():
def set_artifact_files_optional(self):
self.require_artifact_files = False
- # _record_message()
- #
- # Records the message if recording is enabled
- #
- # Args:
- # message (Message): The message to record
- #
- def _record_message(self, message):
-
- if self._log_handle is None:
- return
-
- INDENT = " "
- EMPTYTIME = "--:--:--"
- template = "[{timecode: <8}] {type: <7}"
-
- # If this message is associated with a plugin, print what
- # we know about the plugin.
- plugin_name = ""
- if message.unique_id:
- template += " {plugin}"
- plugin = Plugin._lookup(message.unique_id)
- plugin_name = plugin.name
-
- template += ": {message}"
-
- detail = ''
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- text = template.format(timecode=timecode,
- plugin=plugin_name,
- type=message.message_type.upper(),
- message=message.message,
- detail=detail)
-
- # Write to the open log file
- self._log_handle.write('{}\n'.format(text))
- self._log_handle.flush()
-
- # _push_message_depth() / _pop_message_depth()
- #
- # For status messages, send the depth of timed
- # activities inside a given task through the message
- #
- def _push_message_depth(self, silent_nested):
- self._message_depth.appendleft(silent_nested)
-
- def _pop_message_depth(self):
- assert self._message_depth
- self._message_depth.popleft()
-
# Force the resolved XDG variables into the environment,
# this is so that they can be used directly to specify
# preferred locations of things from user configuration