diff options
24 files changed, 365 insertions, 338 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py index ec80e6417..02adb3404 100644 --- a/src/buildstream/_artifact.py +++ b/src/buildstream/_artifact.py @@ -162,7 +162,7 @@ class Artifact(): new_build.was_workspaced = bool(e._get_workspace()) # Store log file - log_filename = context.get_log_filename() + log_filename = context.messenger.get_log_filename() if log_filename: digest = self._cas.add_object(path=log_filename) element._build_log_path = self._cas.objpath(digest) diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 49930d4ca..a29973158 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -244,7 +244,7 @@ class BaseCache(): # def _message(self, message_type, message, **kwargs): args = dict(kwargs) - self.context.message( + self.context.messenger.message( Message(None, message_type, message, **args)) # _set_remotes(): @@ -272,7 +272,7 @@ class BaseCache(): def remote_failed(url, error): self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error)) - with self.context.timed_activity("Initializing remote caches", silent_nested=True): + with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True): self.initialize_remotes(on_failure=remote_failed) # _list_refs_mtimes() diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 53f6029db..771e31208 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -1052,7 +1052,7 @@ class CASQuota: self._cache_lower_threshold = None # The target cache size for a cleanup self.available_space = None - self._message = context.message + self._message = context.messenger.message self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method self._list_refs_callbacks = [] # Callbacks to all refs diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index c97acc70e..52e4c3db9 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -19,22 +19,17 @@ import os import shutil -import datetime -from collections import deque -from contextlib import contextmanager from . import utils from . import _cachekey -from . import _signals from . import _site from . import _yaml -from ._exceptions import LoadError, LoadErrorReason, BstError -from ._message import Message, MessageType +from ._exceptions import LoadError, LoadErrorReason +from ._messenger import Messenger from ._profile import Topics, PROFILER from ._artifactcache import ArtifactCache from ._sourcecache import SourceCache from ._cas import CASCache, CASQuota, CASCacheUsage from ._workspaces import Workspaces, WorkspaceProjectCache -from .plugin import Plugin from .sandbox import SandboxRemote @@ -149,18 +144,16 @@ class Context(): # Make sure the XDG vars are set in the environment before loading anything self._init_xdg() + self.messenger = Messenger() + # Private variables self._cache_key = None - self._message_handler = None - self._message_depth = deque() self._artifactcache = None self._sourcecache = None self._projects = [] self._project_overrides = _yaml.new_empty_node() self._workspaces = None self._workspace_project_cache = WorkspaceProjectCache() - self._log_handle = None - self._log_filename = None self._cascache = None self._casquota = None @@ -440,197 +433,6 @@ class Context(): return self._cache_key - # set_message_handler() - # - # Sets the handler for any status messages propagated through - # the context. - # - # The message handler should have the same signature as - # the message() method - def set_message_handler(self, handler): - self._message_handler = handler - - # silent_messages(): - # - # Returns: - # (bool): Whether messages are currently being silenced - # - def silent_messages(self): - for silent in self._message_depth: - if silent: - return True - return False - - # message(): - # - # Proxies a message back to the caller, this is the central - # point through which all messages pass. - # - # Args: - # message: A Message object - # - def message(self, message): - - # Tag message only once - if message.depth is None: - message.depth = len(list(self._message_depth)) - - # If we are recording messages, dump a copy into the open log file. - self._record_message(message) - - # Send it off to the log handler (can be the frontend, - # or it can be the child task which will propagate - # to the frontend) - assert self._message_handler - - self._message_handler(message, context=self) - - # silence() - # - # A context manager to silence messages, this behaves in - # the same way as the `silent_nested` argument of the - # Context._timed_activity() context manager: especially - # important messages will not be silenced. - # - @contextmanager - def silence(self): - self._push_message_depth(True) - try: - yield - finally: - self._pop_message_depth() - - # timed_activity() - # - # Context manager for performing timed activities and logging those - # - # Args: - # context (Context): The invocation context object - # activity_name (str): The name of the activity - # detail (str): An optional detailed message, can be multiline output - # silent_nested (bool): If specified, nested messages will be silenced - # - @contextmanager - def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False): - - starttime = datetime.datetime.now() - stopped_time = None - - def stop_time(): - nonlocal stopped_time - stopped_time = datetime.datetime.now() - - def resume_time(): - nonlocal stopped_time - nonlocal starttime - sleep_time = datetime.datetime.now() - stopped_time - starttime += sleep_time - - with _signals.suspendable(stop_time, resume_time): - try: - # Push activity depth for status messages - message = Message(unique_id, MessageType.START, activity_name, detail=detail) - self.message(message) - self._push_message_depth(silent_nested) - yield - - except BstError: - # Note the failure in status messages and reraise, the scheduler - # expects an error when there is an error. - elapsed = datetime.datetime.now() - starttime - message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed) - self._pop_message_depth() - self.message(message) - raise - - elapsed = datetime.datetime.now() - starttime - message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed) - self._pop_message_depth() - self.message(message) - - # recorded_messages() - # - # Records all messages in a log file while the context manager - # is active. - # - # In addition to automatically writing all messages to the - # specified logging file, an open file handle for process stdout - # and stderr will be available via the Context.get_log_handle() API, - # and the full logfile path will be available via the - # Context.get_log_filename() API. - # - # Args: - # filename (str): A logging directory relative filename, - # the pid and .log extension will be automatically - # appended - # - # Yields: - # (str): The fully qualified log filename - # - @contextmanager - def recorded_messages(self, filename): - - # We dont allow recursing in this context manager, and - # we also do not allow it in the main process. - assert self._log_handle is None - assert self._log_filename is None - assert not utils._is_main_process() - - # Create the fully qualified logfile in the log directory, - # appending the pid and .log extension at the end. - self._log_filename = os.path.join(self.logdir, - '{}.{}.log'.format(filename, os.getpid())) - - # Ensure the directory exists first - directory = os.path.dirname(self._log_filename) - os.makedirs(directory, exist_ok=True) - - with open(self._log_filename, 'a') as logfile: - - # Write one last line to the log and flush it to disk - def flush_log(): - - # If the process currently had something happening in the I/O stack - # then trying to reenter the I/O stack will fire a runtime error. - # - # So just try to flush as well as we can at SIGTERM time - try: - logfile.write('\n\nForcefully terminated\n') - logfile.flush() - except RuntimeError: - os.fsync(logfile.fileno()) - - self._log_handle = logfile - with _signals.terminator(flush_log): - yield self._log_filename - - self._log_handle = None - self._log_filename = None - - # get_log_handle() - # - # Fetches the active log handle, this will return the active - # log file handle when the Context.recorded_messages() context - # manager is active - # - # Returns: - # (file): The active logging file handle, or None - # - def get_log_handle(self): - return self._log_handle - - # get_log_filename() - # - # Fetches the active log filename, this will return the active - # log filename when the Context.recorded_messages() context - # manager is active - # - # Returns: - # (str): The active logging filename, or None - # - def get_log_filename(self): - return self._log_filename - # set_artifact_directories_optional() # # This indicates that the current context (command or configuration) @@ -650,66 +452,6 @@ class Context(): def set_artifact_files_optional(self): self.require_artifact_files = False - # _record_message() - # - # Records the message if recording is enabled - # - # Args: - # message (Message): The message to record - # - def _record_message(self, message): - - if self._log_handle is None: - return - - INDENT = " " - EMPTYTIME = "--:--:--" - template = "[{timecode: <8}] {type: <7}" - - # If this message is associated with a plugin, print what - # we know about the plugin. - plugin_name = "" - if message.unique_id: - template += " {plugin}" - plugin = Plugin._lookup(message.unique_id) - plugin_name = plugin.name - - template += ": {message}" - - detail = '' - if message.detail is not None: - template += "\n\n{detail}" - detail = message.detail.rstrip('\n') - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - text = template.format(timecode=timecode, - plugin=plugin_name, - type=message.message_type.upper(), - message=message.message, - detail=detail) - - # Write to the open log file - self._log_handle.write('{}\n'.format(text)) - self._log_handle.flush() - - # _push_message_depth() / _pop_message_depth() - # - # For status messages, send the depth of timed - # activities inside a given task through the message - # - def _push_message_depth(self, silent_nested): - self._message_depth.appendleft(silent_nested) - - def _pop_message_depth(self): - assert self._message_depth - self._message_depth.popleft() - # Force the resolved XDG variables into the environment, # this is so that they can be used directly to specify # preferred locations of things from user configuration diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index dafde153c..9550fea40 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -213,7 +213,7 @@ class App(): indent=INDENT) # Propagate pipeline feedback to the user - self.context.set_message_handler(self._message_handler) + self.context.messenger.set_message_handler(self._message_handler) # Preflight the artifact cache after initializing logging, # this can cause messages to be emitted. @@ -459,7 +459,7 @@ class App(): # def _message(self, message_type, message, **kwargs): args = dict(kwargs) - self.context.message( + self.context.messenger.message( Message(None, message_type, message, **args)) # Exception handler @@ -695,11 +695,11 @@ class App(): # # Handle messages from the pipeline # - def _message_handler(self, message, context): + def _message_handler(self, message, is_silenced): # Drop status messages from the UI if not verbose, we'll still see # info messages and status messages will still go to the log files. - if not context.log_verbose and message.message_type == MessageType.STATUS: + if not self.context.log_verbose and message.message_type == MessageType.STATUS: return # Hold on to the failure messages @@ -707,7 +707,7 @@ class App(): self._fail_messages[message.unique_id] = message # Send to frontend if appropriate - if self.context.silent_messages() and (message.message_type not in unconditional_messages): + if is_silenced and (message.message_type not in unconditional_messages): return if self._status: diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py index 5f98b127c..b221c48d0 100644 --- a/src/buildstream/_loader/loader.py +++ b/src/buildstream/_loader/loader.py @@ -721,7 +721,7 @@ class Loader(): raise LoadError(warning_token, brief) message = Message(None, MessageType.WARN, brief) - self._context.message(message) + self._context.messenger.message(message) # Print warning messages if any of the specified elements have invalid names. # diff --git a/src/buildstream/_message.py b/src/buildstream/_message.py index c2cdb8277..7f1a939d2 100644 --- a/src/buildstream/_message.py +++ b/src/buildstream/_message.py @@ -59,7 +59,6 @@ class Message(): detail=None, action_name=None, elapsed=None, - depth=None, logfile=None, sandbox=None, scheduler=False): @@ -68,7 +67,6 @@ class Message(): self.detail = detail # An additional detail string self.action_name = action_name # Name of the task queue (fetch, refresh, build, etc) self.elapsed = elapsed # The elapsed time, in timed messages - self.depth = depth # The depth of a timed message self.logfile = logfile # The log file path where commands took place self.sandbox = sandbox # The error that caused this message used a sandbox self.pid = os.getpid() # The process pid diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py new file mode 100644 index 000000000..7dec93994 --- /dev/null +++ b/src/buildstream/_messenger.py @@ -0,0 +1,285 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Angelos Evripiotis <jevripiotis@bloomberg.net> + +import os +import datetime +from contextlib import contextmanager + +from . import _signals +from . import utils +from ._exceptions import BstError +from ._message import Message, MessageType +from .plugin import Plugin + + +class Messenger(): + + def __init__(self): + self._message_handler = None + self._silence_scope_depth = 0 + self._log_handle = None + self._log_filename = None + + # set_message_handler() + # + # Sets the handler for any status messages propagated through + # the context. + # + # The handler should have the signature: + # + # def handler( + # message: _message.Message, # The message to send. + # is_silenced: bool, # Whether messages are currently being silenced. + # ) -> None + # + def set_message_handler(self, handler): + self._message_handler = handler + + # _silent_messages(): + # + # Returns: + # (bool): Whether messages are currently being silenced + # + def _silent_messages(self): + return self._silence_scope_depth > 0 + + # message(): + # + # Proxies a message back to the caller, this is the central + # point through which all messages pass. + # + # Args: + # message: A Message object + # + def message(self, message): + + # If we are recording messages, dump a copy into the open log file. + self._record_message(message) + + # Send it off to the log handler (can be the frontend, + # or it can be the child task which will propagate + # to the frontend) + assert self._message_handler + + self._message_handler(message, is_silenced=self._silent_messages()) + + # silence() + # + # A context manager to silence messages, this behaves in + # the same way as the `silent_nested` argument of the + # timed_activity() context manager: all but + # _message.unconditional_messages will be silenced. + # + # Args: + # actually_silence (bool): Whether to actually do the silencing, if + # False then this context manager does not + # affect anything. + # + @contextmanager + def silence(self, *, actually_silence=True): + if not actually_silence: + yield + return + + self._silence_scope_depth += 1 + try: + yield + finally: + assert self._silence_scope_depth > 0 + self._silence_scope_depth -= 1 + + # timed_activity() + # + # Context manager for performing timed activities and logging those + # + # Args: + # activity_name (str): The name of the activity + # context (Context): The invocation context object + # unique_id (int): Optionally, the unique id of the plugin related to the message + # detail (str): An optional detailed message, can be multiline output + # silent_nested (bool): If True, all but _message.unconditional_messages are silenced + # + @contextmanager + def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False): + + starttime = datetime.datetime.now() + stopped_time = None + + def stop_time(): + nonlocal stopped_time + stopped_time = datetime.datetime.now() + + def resume_time(): + nonlocal stopped_time + nonlocal starttime + sleep_time = datetime.datetime.now() - stopped_time + starttime += sleep_time + + with _signals.suspendable(stop_time, resume_time): + try: + # Push activity depth for status messages + message = Message(unique_id, MessageType.START, activity_name, detail=detail) + self.message(message) + with self.silence(actually_silence=silent_nested): + yield + + except BstError: + # Note the failure in status messages and reraise, the scheduler + # expects an error when there is an error. + elapsed = datetime.datetime.now() - starttime + message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed) + self.message(message) + raise + + elapsed = datetime.datetime.now() - starttime + message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed) + self.message(message) + + # recorded_messages() + # + # Records all messages in a log file while the context manager + # is active. + # + # In addition to automatically writing all messages to the + # specified logging file, an open file handle for process stdout + # and stderr will be available via the Messenger.get_log_handle() API, + # and the full logfile path will be available via the + # Messenger.get_log_filename() API. + # + # Args: + # filename (str): A logging directory relative filename, + # the pid and .log extension will be automatically + # appended + # + # logdir (str) : The path to the log file directory. + # + # Yields: + # (str): The fully qualified log filename + # + @contextmanager + def recorded_messages(self, filename, logdir): + + # We dont allow recursing in this context manager, and + # we also do not allow it in the main process. + assert self._log_handle is None + assert self._log_filename is None + assert not utils._is_main_process() + + # Create the fully qualified logfile in the log directory, + # appending the pid and .log extension at the end. + self._log_filename = os.path.join(logdir, + '{}.{}.log'.format(filename, os.getpid())) + + # Ensure the directory exists first + directory = os.path.dirname(self._log_filename) + os.makedirs(directory, exist_ok=True) + + with open(self._log_filename, 'a') as logfile: + + # Write one last line to the log and flush it to disk + def flush_log(): + + # If the process currently had something happening in the I/O stack + # then trying to reenter the I/O stack will fire a runtime error. + # + # So just try to flush as well as we can at SIGTERM time + try: + logfile.write('\n\nForcefully terminated\n') + logfile.flush() + except RuntimeError: + os.fsync(logfile.fileno()) + + self._log_handle = logfile + with _signals.terminator(flush_log): + yield self._log_filename + + self._log_handle = None + self._log_filename = None + + # get_log_handle() + # + # Fetches the active log handle, this will return the active + # log file handle when the Messenger.recorded_messages() context + # manager is active + # + # Returns: + # (file): The active logging file handle, or None + # + def get_log_handle(self): + return self._log_handle + + # get_log_filename() + # + # Fetches the active log filename, this will return the active + # log filename when the Messenger.recorded_messages() context + # manager is active + # + # Returns: + # (str): The active logging filename, or None + # + def get_log_filename(self): + return self._log_filename + + # _record_message() + # + # Records the message if recording is enabled + # + # Args: + # message (Message): The message to record + # + def _record_message(self, message): + + if self._log_handle is None: + return + + INDENT = " " + EMPTYTIME = "--:--:--" + template = "[{timecode: <8}] {type: <7}" + + # If this message is associated with a plugin, print what + # we know about the plugin. + plugin_name = "" + if message.unique_id: + template += " {plugin}" + plugin = Plugin._lookup(message.unique_id) + plugin_name = plugin.name + + template += ": {message}" + + detail = '' + if message.detail is not None: + template += "\n\n{detail}" + detail = message.detail.rstrip('\n') + detail = INDENT + INDENT.join(detail.splitlines(True)) + + timecode = EMPTYTIME + if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): + hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) + minutes, seconds = divmod(remainder, 60) + timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) + + text = template.format(timecode=timecode, + plugin=plugin_name, + type=message.message_type.upper(), + message=message.message, + detail=detail) + + # Write to the open log file + self._log_handle.write('{}\n'.format(text)) + self._log_handle.flush() diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index 0758cf5ff..4352df56c 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -123,7 +123,7 @@ class Pipeline(): # targets (list of Element): The list of toplevel element targets # def resolve_elements(self, targets): - with self._context.timed_activity("Resolving cached state", silent_nested=True): + with self._context.messenger.timed_activity("Resolving cached state", silent_nested=True): for element in self.dependencies(targets, Scope.ALL): # Preflight @@ -355,7 +355,7 @@ class Pipeline(): def assert_consistent(self, elements): inconsistent = [] inconsistent_workspaced = [] - with self._context.timed_activity("Checking sources"): + with self._context.messenger.timed_activity("Checking sources"): for element in elements: if element._get_consistency() == Consistency.INCONSISTENT: if element._get_workspace(): @@ -391,7 +391,7 @@ class Pipeline(): # def assert_sources_cached(self, elements): uncached = [] - with self._context.timed_activity("Checking sources"): + with self._context.messenger.timed_activity("Checking sources"): for element in elements: if element._get_consistency() < Consistency.CACHED and \ not element._source_cached(): @@ -466,7 +466,7 @@ class Pipeline(): # def _message(self, message_type, message, **kwargs): args = dict(kwargs) - self._context.message( + self._context.messenger.message( Message(None, message_type, message, **args)) diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py index 114d25054..5f433c090 100644 --- a/src/buildstream/_project.py +++ b/src/buildstream/_project.py @@ -445,10 +445,10 @@ class Project(): # (list): A list of loaded Element # def load_elements(self, targets, *, rewritable=False): - with self._context.timed_activity("Loading elements", silent_nested=True): + with self._context.messenger.timed_activity("Loading elements", silent_nested=True): meta_elements = self.loader.load(targets, rewritable=rewritable, ticker=None) - with self._context.timed_activity("Resolving elements"): + with self._context.messenger.timed_activity("Resolving elements"): elements = [ Element._new_from_meta(meta) for meta in meta_elements @@ -466,7 +466,7 @@ class Project(): for source, ref in redundant_refs ] detail += "\n".join(lines) - self._context.message( + self._context.messenger.message( Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail)) return elements @@ -694,7 +694,7 @@ class Project(): # Deprecation check if fail_on_overlap is not None: - self._context.message( + self._context.messenger.message( Message( None, MessageType.WARN, diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 51531de64..1c2726b5f 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -162,8 +162,9 @@ class Job(): self._parent_start_listening() child_job = self.create_child_job( # pylint: disable=assignment-from-no-return - self._scheduler.context, self.action_name, + self._scheduler.context.messenger, + self._scheduler.context.logdir, self._logfile, self._max_retries, self._tries, @@ -347,7 +348,7 @@ class Job(): if "unique_id" in kwargs: unique_id = kwargs["unique_id"] del kwargs["unique_id"] - self._scheduler.context.message( + self._scheduler.context.messenger.message( Message(unique_id, message_type, message, **kwargs)) ####################################################### @@ -470,7 +471,7 @@ class Job(): if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope.message) + self._scheduler.context.messenger.message(envelope.message) elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state @@ -562,11 +563,12 @@ class Job(): class ChildJob(): def __init__( - self, context, action_name, logfile, max_retries, tries, message_unique_id, task_id): + self, action_name, messenger, logdir, logfile, max_retries, tries, message_unique_id, task_id): self.action_name = action_name - self._context = context + self._messenger = messenger + self._logdir = logdir self._logfile = logfile self._max_retries = max_retries self._tries = tries @@ -592,7 +594,7 @@ class ChildJob(): if "unique_id" in kwargs: unique_id = kwargs["unique_id"] del kwargs["unique_id"] - self._context.message( + self._messenger.message( Message(unique_id, message_type, message, **kwargs)) # send_message() @@ -673,7 +675,7 @@ class ChildJob(): # Set the global message handler in this child # process to forward messages to the parent process self._queue = queue - self._context.set_message_handler(self._child_message_handler) + self._messenger.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() stopped_time = None @@ -690,7 +692,7 @@ class ChildJob(): # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ - self._context.recorded_messages(self._logfile) as filename: + self._messenger.recorded_messages(self._logfile, self._logdir) as filename: self.message(MessageType.START, self.action_name, logfile=filename) @@ -828,16 +830,16 @@ class ChildJob(): # the message back to the parent process for further propagation. # # Args: - # message (Message): The message to log - # context (Context): The context object delegating this message + # message (Message): The message to log + # is_silenced (bool) : Whether messages are silenced # - def _child_message_handler(self, message, context): + def _child_message_handler(self, message, is_silenced): message.action_name = self.action_name message.task_id = self._task_id # Send to frontend if appropriate - if context.silent_messages() and (message.message_type not in unconditional_messages): + if is_silenced and (message.message_type not in unconditional_messages): return if message.message_type == MessageType.LOG: diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index f2cefd5d2..49a5381c1 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -332,7 +332,7 @@ class Queue(): def _message(self, element, message_type, brief, **kwargs): context = element._get_context() message = Message(element._unique_id, message_type, brief, **kwargs) - context.message(message) + context.messenger.message(message) def _element_log_path(self, element): project = element._get_project() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 195be55ba..c6d748f91 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -617,7 +617,7 @@ class Stream(): # Prune the artifact cache if ref_removed and not no_prune: - with self._context.timed_activity("Pruning artifact cache"): + with self._context.messenger.timed_activity("Pruning artifact cache"): self._artifacts.prune() if not ref_removed: @@ -789,8 +789,8 @@ class Stream(): # Remove workspace directory if prompted if remove_dir: - with self._context.timed_activity("Removing workspace directory {}" - .format(workspace.get_absolute_path())): + with self._context.messenger.timed_activity("Removing workspace directory {}" + .format(workspace.get_absolute_path())): try: shutil.rmtree(workspace.get_absolute_path()) except OSError as e: @@ -1195,7 +1195,7 @@ class Stream(): # def _message(self, message_type, message, **kwargs): args = dict(kwargs) - self._context.message( + self._context.messenger.message( Message(None, message_type, message, **args)) # _add_queue() @@ -1434,7 +1434,7 @@ class Stream(): # Collect the sources in the given sandbox into a tarfile def _collect_sources(self, directory, tar_name, element_name, compression): - with self._context.timed_activity("Creating tarball {}".format(tar_name)): + with self._context.messenger.timed_activity("Creating tarball {}".format(tar_name)): if compression == "none": permissions = "w:" else: diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index a12ff61ec..de969c267 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -657,10 +657,10 @@ class Plugin(): # This will raise SourceError on its own self.call(... command which takes time ...) """ - with self.__context.timed_activity(activity_name, - unique_id=self._unique_id, - detail=detail, - silent_nested=silent_nested): + with self.__context.messenger.timed_activity(activity_name, + unique_id=self._unique_id, + detail=detail, + silent_nested=silent_nested): yield def call(self, *popenargs, fail=None, fail_temporarily=False, **kwargs): @@ -798,7 +798,7 @@ class Plugin(): # @contextmanager def _output_file(self): - log = self.__context.get_log_handle() + log = self.__context.messenger.get_log_handle() if log is None: with open(os.devnull, "w") as output: yield output @@ -870,7 +870,7 @@ class Plugin(): def __message(self, message_type, brief, **kwargs): message = Message(self._unique_id, message_type, brief, **kwargs) - self.__context.message(message) + self.__context.messenger.message(message) def __note_command(self, output, *popenargs, **kwargs): workdir = kwargs.get('cwd', os.getcwd()) diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index d90b164bc..075a69a2b 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -106,7 +106,7 @@ class SandboxRemote(Sandbox): self.operation_name = None def info(self, msg): - self._get_context().message(Message(None, MessageType.INFO, msg)) + self._get_context().messenger.message(Message(None, MessageType.INFO, msg)) @staticmethod def specs_from_config_node(config_node, basedir=None): @@ -226,8 +226,8 @@ class SandboxRemote(Sandbox): # Set up signal handler to trigger cancel_operation on SIGTERM operation = None - with self._get_context().timed_activity("Waiting for the remote build to complete"), \ - _signals.terminator(partial(self.cancel_operation, channel)): + with self._get_context().messenger.timed_activity("Waiting for the remote build to complete"), \ + _signals.terminator(partial(self.cancel_operation, channel)): operation = __run_remote_command(stub, execute_request=request) if operation is None: return None diff --git a/src/buildstream/sandbox/sandbox.py b/src/buildstream/sandbox/sandbox.py index c96ccb57b..a651fb783 100644 --- a/src/buildstream/sandbox/sandbox.py +++ b/src/buildstream/sandbox/sandbox.py @@ -628,7 +628,7 @@ class _SandboxBatch(): def execute_group(self, group): if group.label: context = self.sandbox._get_context() - cm = context.timed_activity(group.label, unique_id=self.sandbox._get_plugin_id()) + cm = context.messenger.timed_activity(group.label, unique_id=self.sandbox._get_plugin_id()) else: cm = contextlib.suppress() @@ -640,7 +640,7 @@ class _SandboxBatch(): context = self.sandbox._get_context() message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS, 'Running command', detail=command.label) - context.message(message) + context.messenger.message(message) exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env) if exitcode != 0: diff --git a/src/buildstream/source.py b/src/buildstream/source.py index 9fc9cf17d..b5c8f9a63 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -698,7 +698,7 @@ class Source(Plugin): # Source consistency interrogations are silent. context = self._get_context() - with context.silence(): + with context.messenger.silence(): self.__consistency = self.get_consistency() # pylint: disable=assignment-from-no-return # Give the Source an opportunity to validate the cached @@ -1150,7 +1150,7 @@ class Source(Plugin): # Silence the STATUS messages which might happen as a result # of checking the source fetchers. - with context.silence(): + with context.messenger.silence(): source_fetchers = self.get_source_fetchers() # Use the source fetchers if they are provided @@ -1165,7 +1165,7 @@ class Source(Plugin): while True: - with context.silence(): + with context.messenger.silence(): try: fetcher = next(source_fetchers) except StopIteration: diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 0b57a6c9a..a4ea74633 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -24,7 +24,7 @@ DATA_DIR = os.path.join( # Handle messages from the pipeline -def message_handler(message, context): +def message_handler(message, is_silenced): pass @@ -95,7 +95,7 @@ def test_pull(cli, tmpdir, datafiles): # Fake minimal context context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project project = Project(project_dir, context) @@ -136,7 +136,7 @@ def _test_pull(user_config_file, project_dir, cache_dir, context.cachedir = cache_dir context.casdir = os.path.join(cache_dir, 'cas') context.tmpdir = os.path.join(cache_dir, 'tmp') - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project manually project = Project(project_dir, context) @@ -198,7 +198,7 @@ def test_pull_tree(cli, tmpdir, datafiles): # Fake minimal context context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project and CAS cache project = Project(project_dir, context) @@ -273,7 +273,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_digest, queue): # Fake minimal context context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project manually project = Project(project_dir, context) @@ -308,7 +308,7 @@ def _test_pull_tree(user_config_file, project_dir, artifact_digest, queue): # Fake minimal context context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project manually project = Project(project_dir, context) diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 862393f35..a54c1df09 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -23,7 +23,7 @@ DATA_DIR = os.path.join( # Handle messages from the pipeline -def message_handler(message, context): +def message_handler(message, is_silenced): pass @@ -71,7 +71,7 @@ def test_push(cli, tmpdir, datafiles): # Fake minimal context context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project manually project = Project(project_dir, context) @@ -108,7 +108,7 @@ def _test_push(user_config_file, project_dir, element_name, queue): # Fake minimal context context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project manually project = Project(project_dir, context) @@ -194,7 +194,7 @@ def _test_push_message(user_config_file, project_dir, queue): # Fake minimal context context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) # Load the project manually project = Project(project_dir, context) diff --git a/tests/internals/loader.py b/tests/internals/loader.py index 006b5787f..a4ebdb9ac 100644 --- a/tests/internals/loader.py +++ b/tests/internals/loader.py @@ -13,14 +13,14 @@ DATA_DIR = os.path.join( ) -def dummy_handler(message, context): +def dummy_handler(message, is_silenced): pass def make_loader(basedir): context = Context() context.load(config=os.devnull) - context.set_message_handler(dummy_handler) + context.messenger.set_message_handler(dummy_handler) project = Project(basedir, context) return project.loader diff --git a/tests/internals/pluginloading.py b/tests/internals/pluginloading.py index 2d997077a..9093680f4 100644 --- a/tests/internals/pluginloading.py +++ b/tests/internals/pluginloading.py @@ -19,10 +19,10 @@ def create_pipeline(tmpdir, basedir, target): context.casdir = os.path.join(str(tmpdir), 'cas') project = Project(basedir, context) - def dummy_handler(message, context): + def dummy_handler(message, is_silenced): pass - context.set_message_handler(dummy_handler) + context.messenger.set_message_handler(dummy_handler) pipeline = Pipeline(context, project, None) targets, = pipeline.load([(target,)]) diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py index cd8a3e989..3fc9d96a6 100644 --- a/tests/sourcecache/fetch.py +++ b/tests/sourcecache/fetch.py @@ -34,7 +34,7 @@ from tests.testutils import create_artifact_share DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project") -def message_handler(message, context): +def message_handler(message, is_silenced): pass @@ -71,7 +71,7 @@ def test_source_fetch(cli, tmpdir, datafiles): context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) project = Project(project_dir, context) project.ensure_fully_loaded() @@ -146,7 +146,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles): context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) project = Project(project_dir, context) project.ensure_fully_loaded() @@ -204,7 +204,7 @@ def test_pull_fail(cli, tmpdir, datafiles): # get the source object context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) project = Project(project_dir, context) project.ensure_fully_loaded() diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py index 4c652d21e..6282b6e60 100644 --- a/tests/sourcecache/push.py +++ b/tests/sourcecache/push.py @@ -35,7 +35,7 @@ from tests.testutils import create_artifact_share DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project") -def message_handler(message, context): +def message_handler(message, is_silenced): pass @@ -72,7 +72,7 @@ def test_source_push(cli, tmpdir, datafiles): # get the source object context = Context() context.load(config=user_config_file) - context.set_message_handler(message_handler) + context.messenger.set_message_handler(message_handler) project = Project(project_dir, context) project.ensure_fully_loaded() diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py index 318285292..9dc431bda 100644 --- a/tests/sourcecache/staging.py +++ b/tests/sourcecache/staging.py @@ -35,7 +35,7 @@ from tests.testutils.element_generators import create_element_size DATA_DIR = os.path.dirname(os.path.realpath(__file__)) -def dummy_message_handler(message, context): +def dummy_message_handler(message, is_silenced): pass @@ -63,7 +63,7 @@ def test_source_staged(tmpdir, cli, datafiles): project = Project(project_dir, context) project.ensure_fully_loaded() context.cachedir = cachedir - context.set_message_handler(dummy_message_handler) + context.messenger.set_message_handler(dummy_message_handler) sourcecache = context.sourcecache cas = context.get_cascache() @@ -106,7 +106,7 @@ def test_source_fetch(tmpdir, cli, datafiles): project = Project(project_dir, context) project.ensure_fully_loaded() context.cachedir = cachedir - context.set_message_handler(dummy_message_handler) + context.messenger.set_message_handler(dummy_message_handler) cas = context.get_cascache() res = cli.run(project=project_dir, args=["source", "fetch", "import-dev.bst"]) @@ -148,7 +148,7 @@ def test_staged_source_build(tmpdir, datafiles, cli): project = Project(project_dir, context) project.ensure_fully_loaded() context.cachedir = cachedir - context.set_message_handler(dummy_message_handler) + context.messenger.set_message_handler(dummy_message_handler) element = project.load_elements(["import-dev.bst"])[0] |