summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-07-05 10:20:14 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-05 10:20:14 +0000
commit34763b3736029edc2ceeb8f86389b8e89441e266 (patch)
tree9a670bec10ae0cce46c6243ee5fae2a725ed0ba6
parent40a2ebe05bf2d4a922ac11f453c79a3d74b196ca (diff)
parent68e5e5c1b7d4434a2310044a8e636ed3f23ff3cb (diff)
downloadbuildstream-34763b3736029edc2ceeb8f86389b8e89441e266.tar.gz
Merge branch 'aevri/messenger' into 'master'
Extract 'Messenger' from 'Context' and use directly See merge request BuildStream/buildstream!1445
-rw-r--r--src/buildstream/_artifact.py2
-rw-r--r--src/buildstream/_basecache.py4
-rw-r--r--src/buildstream/_cas/cascache.py2
-rw-r--r--src/buildstream/_context.py266
-rw-r--r--src/buildstream/_frontend/app.py10
-rw-r--r--src/buildstream/_loader/loader.py2
-rw-r--r--src/buildstream/_message.py2
-rw-r--r--src/buildstream/_messenger.py285
-rw-r--r--src/buildstream/_pipeline.py8
-rw-r--r--src/buildstream/_project.py8
-rw-r--r--src/buildstream/_scheduler/jobs/job.py26
-rw-r--r--src/buildstream/_scheduler/queues/queue.py2
-rw-r--r--src/buildstream/_stream.py10
-rw-r--r--src/buildstream/plugin.py12
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py6
-rw-r--r--src/buildstream/sandbox/sandbox.py4
-rw-r--r--src/buildstream/source.py6
-rw-r--r--tests/artifactcache/pull.py12
-rw-r--r--tests/artifactcache/push.py8
-rw-r--r--tests/internals/loader.py4
-rw-r--r--tests/internals/pluginloading.py4
-rw-r--r--tests/sourcecache/fetch.py8
-rw-r--r--tests/sourcecache/push.py4
-rw-r--r--tests/sourcecache/staging.py8
24 files changed, 365 insertions, 338 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index ec80e6417..02adb3404 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -162,7 +162,7 @@ class Artifact():
new_build.was_workspaced = bool(e._get_workspace())
# Store log file
- log_filename = context.get_log_filename()
+ log_filename = context.messenger.get_log_filename()
if log_filename:
digest = self._cas.add_object(path=log_filename)
element._build_log_path = self._cas.objpath(digest)
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index 49930d4ca..a29973158 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -244,7 +244,7 @@ class BaseCache():
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
- self.context.message(
+ self.context.messenger.message(
Message(None, message_type, message, **args))
# _set_remotes():
@@ -272,7 +272,7 @@ class BaseCache():
def remote_failed(url, error):
self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
- with self.context.timed_activity("Initializing remote caches", silent_nested=True):
+ with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
# _list_refs_mtimes()
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 53f6029db..771e31208 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -1052,7 +1052,7 @@ class CASQuota:
self._cache_lower_threshold = None # The target cache size for a cleanup
self.available_space = None
- self._message = context.message
+ self._message = context.messenger.message
self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method
self._list_refs_callbacks = [] # Callbacks to all refs
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index c97acc70e..52e4c3db9 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -19,22 +19,17 @@
import os
import shutil
-import datetime
-from collections import deque
-from contextlib import contextmanager
from . import utils
from . import _cachekey
-from . import _signals
from . import _site
from . import _yaml
-from ._exceptions import LoadError, LoadErrorReason, BstError
-from ._message import Message, MessageType
+from ._exceptions import LoadError, LoadErrorReason
+from ._messenger import Messenger
from ._profile import Topics, PROFILER
from ._artifactcache import ArtifactCache
from ._sourcecache import SourceCache
from ._cas import CASCache, CASQuota, CASCacheUsage
from ._workspaces import Workspaces, WorkspaceProjectCache
-from .plugin import Plugin
from .sandbox import SandboxRemote
@@ -149,18 +144,16 @@ class Context():
# Make sure the XDG vars are set in the environment before loading anything
self._init_xdg()
+ self.messenger = Messenger()
+
# Private variables
self._cache_key = None
- self._message_handler = None
- self._message_depth = deque()
self._artifactcache = None
self._sourcecache = None
self._projects = []
self._project_overrides = _yaml.new_empty_node()
self._workspaces = None
self._workspace_project_cache = WorkspaceProjectCache()
- self._log_handle = None
- self._log_filename = None
self._cascache = None
self._casquota = None
@@ -440,197 +433,6 @@ class Context():
return self._cache_key
- # set_message_handler()
- #
- # Sets the handler for any status messages propagated through
- # the context.
- #
- # The message handler should have the same signature as
- # the message() method
- def set_message_handler(self, handler):
- self._message_handler = handler
-
- # silent_messages():
- #
- # Returns:
- # (bool): Whether messages are currently being silenced
- #
- def silent_messages(self):
- for silent in self._message_depth:
- if silent:
- return True
- return False
-
- # message():
- #
- # Proxies a message back to the caller, this is the central
- # point through which all messages pass.
- #
- # Args:
- # message: A Message object
- #
- def message(self, message):
-
- # Tag message only once
- if message.depth is None:
- message.depth = len(list(self._message_depth))
-
- # If we are recording messages, dump a copy into the open log file.
- self._record_message(message)
-
- # Send it off to the log handler (can be the frontend,
- # or it can be the child task which will propagate
- # to the frontend)
- assert self._message_handler
-
- self._message_handler(message, context=self)
-
- # silence()
- #
- # A context manager to silence messages, this behaves in
- # the same way as the `silent_nested` argument of the
- # Context._timed_activity() context manager: especially
- # important messages will not be silenced.
- #
- @contextmanager
- def silence(self):
- self._push_message_depth(True)
- try:
- yield
- finally:
- self._pop_message_depth()
-
- # timed_activity()
- #
- # Context manager for performing timed activities and logging those
- #
- # Args:
- # context (Context): The invocation context object
- # activity_name (str): The name of the activity
- # detail (str): An optional detailed message, can be multiline output
- # silent_nested (bool): If specified, nested messages will be silenced
- #
- @contextmanager
- def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
-
- starttime = datetime.datetime.now()
- stopped_time = None
-
- def stop_time():
- nonlocal stopped_time
- stopped_time = datetime.datetime.now()
-
- def resume_time():
- nonlocal stopped_time
- nonlocal starttime
- sleep_time = datetime.datetime.now() - stopped_time
- starttime += sleep_time
-
- with _signals.suspendable(stop_time, resume_time):
- try:
- # Push activity depth for status messages
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
- self.message(message)
- self._push_message_depth(silent_nested)
- yield
-
- except BstError:
- # Note the failure in status messages and reraise, the scheduler
- # expects an error when there is an error.
- elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
- self._pop_message_depth()
- self.message(message)
- raise
-
- elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
- self._pop_message_depth()
- self.message(message)
-
- # recorded_messages()
- #
- # Records all messages in a log file while the context manager
- # is active.
- #
- # In addition to automatically writing all messages to the
- # specified logging file, an open file handle for process stdout
- # and stderr will be available via the Context.get_log_handle() API,
- # and the full logfile path will be available via the
- # Context.get_log_filename() API.
- #
- # Args:
- # filename (str): A logging directory relative filename,
- # the pid and .log extension will be automatically
- # appended
- #
- # Yields:
- # (str): The fully qualified log filename
- #
- @contextmanager
- def recorded_messages(self, filename):
-
- # We dont allow recursing in this context manager, and
- # we also do not allow it in the main process.
- assert self._log_handle is None
- assert self._log_filename is None
- assert not utils._is_main_process()
-
- # Create the fully qualified logfile in the log directory,
- # appending the pid and .log extension at the end.
- self._log_filename = os.path.join(self.logdir,
- '{}.{}.log'.format(filename, os.getpid()))
-
- # Ensure the directory exists first
- directory = os.path.dirname(self._log_filename)
- os.makedirs(directory, exist_ok=True)
-
- with open(self._log_filename, 'a') as logfile:
-
- # Write one last line to the log and flush it to disk
- def flush_log():
-
- # If the process currently had something happening in the I/O stack
- # then trying to reenter the I/O stack will fire a runtime error.
- #
- # So just try to flush as well as we can at SIGTERM time
- try:
- logfile.write('\n\nForcefully terminated\n')
- logfile.flush()
- except RuntimeError:
- os.fsync(logfile.fileno())
-
- self._log_handle = logfile
- with _signals.terminator(flush_log):
- yield self._log_filename
-
- self._log_handle = None
- self._log_filename = None
-
- # get_log_handle()
- #
- # Fetches the active log handle, this will return the active
- # log file handle when the Context.recorded_messages() context
- # manager is active
- #
- # Returns:
- # (file): The active logging file handle, or None
- #
- def get_log_handle(self):
- return self._log_handle
-
- # get_log_filename()
- #
- # Fetches the active log filename, this will return the active
- # log filename when the Context.recorded_messages() context
- # manager is active
- #
- # Returns:
- # (str): The active logging filename, or None
- #
- def get_log_filename(self):
- return self._log_filename
-
# set_artifact_directories_optional()
#
# This indicates that the current context (command or configuration)
@@ -650,66 +452,6 @@ class Context():
def set_artifact_files_optional(self):
self.require_artifact_files = False
- # _record_message()
- #
- # Records the message if recording is enabled
- #
- # Args:
- # message (Message): The message to record
- #
- def _record_message(self, message):
-
- if self._log_handle is None:
- return
-
- INDENT = " "
- EMPTYTIME = "--:--:--"
- template = "[{timecode: <8}] {type: <7}"
-
- # If this message is associated with a plugin, print what
- # we know about the plugin.
- plugin_name = ""
- if message.unique_id:
- template += " {plugin}"
- plugin = Plugin._lookup(message.unique_id)
- plugin_name = plugin.name
-
- template += ": {message}"
-
- detail = ''
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- text = template.format(timecode=timecode,
- plugin=plugin_name,
- type=message.message_type.upper(),
- message=message.message,
- detail=detail)
-
- # Write to the open log file
- self._log_handle.write('{}\n'.format(text))
- self._log_handle.flush()
-
- # _push_message_depth() / _pop_message_depth()
- #
- # For status messages, send the depth of timed
- # activities inside a given task through the message
- #
- def _push_message_depth(self, silent_nested):
- self._message_depth.appendleft(silent_nested)
-
- def _pop_message_depth(self):
- assert self._message_depth
- self._message_depth.popleft()
-
# Force the resolved XDG variables into the environment,
# this is so that they can be used directly to specify
# preferred locations of things from user configuration
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index dafde153c..9550fea40 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -213,7 +213,7 @@ class App():
indent=INDENT)
# Propagate pipeline feedback to the user
- self.context.set_message_handler(self._message_handler)
+ self.context.messenger.set_message_handler(self._message_handler)
# Preflight the artifact cache after initializing logging,
# this can cause messages to be emitted.
@@ -459,7 +459,7 @@ class App():
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
- self.context.message(
+ self.context.messenger.message(
Message(None, message_type, message, **args))
# Exception handler
@@ -695,11 +695,11 @@ class App():
#
# Handle messages from the pipeline
#
- def _message_handler(self, message, context):
+ def _message_handler(self, message, is_silenced):
# Drop status messages from the UI if not verbose, we'll still see
# info messages and status messages will still go to the log files.
- if not context.log_verbose and message.message_type == MessageType.STATUS:
+ if not self.context.log_verbose and message.message_type == MessageType.STATUS:
return
# Hold on to the failure messages
@@ -707,7 +707,7 @@ class App():
self._fail_messages[message.unique_id] = message
# Send to frontend if appropriate
- if self.context.silent_messages() and (message.message_type not in unconditional_messages):
+ if is_silenced and (message.message_type not in unconditional_messages):
return
if self._status:
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 5f98b127c..b221c48d0 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -721,7 +721,7 @@ class Loader():
raise LoadError(warning_token, brief)
message = Message(None, MessageType.WARN, brief)
- self._context.message(message)
+ self._context.messenger.message(message)
# Print warning messages if any of the specified elements have invalid names.
#
diff --git a/src/buildstream/_message.py b/src/buildstream/_message.py
index c2cdb8277..7f1a939d2 100644
--- a/src/buildstream/_message.py
+++ b/src/buildstream/_message.py
@@ -59,7 +59,6 @@ class Message():
detail=None,
action_name=None,
elapsed=None,
- depth=None,
logfile=None,
sandbox=None,
scheduler=False):
@@ -68,7 +67,6 @@ class Message():
self.detail = detail # An additional detail string
self.action_name = action_name # Name of the task queue (fetch, refresh, build, etc)
self.elapsed = elapsed # The elapsed time, in timed messages
- self.depth = depth # The depth of a timed message
self.logfile = logfile # The log file path where commands took place
self.sandbox = sandbox # The error that caused this message used a sandbox
self.pid = os.getpid() # The process pid
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
new file mode 100644
index 000000000..7dec93994
--- /dev/null
+++ b/src/buildstream/_messenger.py
@@ -0,0 +1,285 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Angelos Evripiotis <jevripiotis@bloomberg.net>
+
+import os
+import datetime
+from contextlib import contextmanager
+
+from . import _signals
+from . import utils
+from ._exceptions import BstError
+from ._message import Message, MessageType
+from .plugin import Plugin
+
+
+class Messenger():
+
+ def __init__(self):
+ self._message_handler = None
+ self._silence_scope_depth = 0
+ self._log_handle = None
+ self._log_filename = None
+
+ # set_message_handler()
+ #
+ # Sets the handler for any status messages propagated through
+ # the context.
+ #
+ # The handler should have the signature:
+ #
+ # def handler(
+ # message: _message.Message, # The message to send.
+ # is_silenced: bool, # Whether messages are currently being silenced.
+ # ) -> None
+ #
+ def set_message_handler(self, handler):
+ self._message_handler = handler
+
+ # _silent_messages():
+ #
+ # Returns:
+ # (bool): Whether messages are currently being silenced
+ #
+ def _silent_messages(self):
+ return self._silence_scope_depth > 0
+
+ # message():
+ #
+ # Proxies a message back to the caller, this is the central
+ # point through which all messages pass.
+ #
+ # Args:
+ # message: A Message object
+ #
+ def message(self, message):
+
+ # If we are recording messages, dump a copy into the open log file.
+ self._record_message(message)
+
+ # Send it off to the log handler (can be the frontend,
+ # or it can be the child task which will propagate
+ # to the frontend)
+ assert self._message_handler
+
+ self._message_handler(message, is_silenced=self._silent_messages())
+
+ # silence()
+ #
+ # A context manager to silence messages, this behaves in
+ # the same way as the `silent_nested` argument of the
+ # timed_activity() context manager: all but
+ # _message.unconditional_messages will be silenced.
+ #
+ # Args:
+ # actually_silence (bool): Whether to actually do the silencing, if
+ # False then this context manager does not
+ # affect anything.
+ #
+ @contextmanager
+ def silence(self, *, actually_silence=True):
+ if not actually_silence:
+ yield
+ return
+
+ self._silence_scope_depth += 1
+ try:
+ yield
+ finally:
+ assert self._silence_scope_depth > 0
+ self._silence_scope_depth -= 1
+
+ # timed_activity()
+ #
+ # Context manager for performing timed activities and logging those
+ #
+ # Args:
+ # activity_name (str): The name of the activity
+ # context (Context): The invocation context object
+ # unique_id (int): Optionally, the unique id of the plugin related to the message
+ # detail (str): An optional detailed message, can be multiline output
+ # silent_nested (bool): If True, all but _message.unconditional_messages are silenced
+ #
+ @contextmanager
+ def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
+
+ starttime = datetime.datetime.now()
+ stopped_time = None
+
+ def stop_time():
+ nonlocal stopped_time
+ stopped_time = datetime.datetime.now()
+
+ def resume_time():
+ nonlocal stopped_time
+ nonlocal starttime
+ sleep_time = datetime.datetime.now() - stopped_time
+ starttime += sleep_time
+
+ with _signals.suspendable(stop_time, resume_time):
+ try:
+ # Push activity depth for status messages
+ message = Message(unique_id, MessageType.START, activity_name, detail=detail)
+ self.message(message)
+ with self.silence(actually_silence=silent_nested):
+ yield
+
+ except BstError:
+ # Note the failure in status messages and reraise, the scheduler
+ # expects an error when there is an error.
+ elapsed = datetime.datetime.now() - starttime
+ message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
+ self.message(message)
+ raise
+
+ elapsed = datetime.datetime.now() - starttime
+ message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
+ self.message(message)
+
+ # recorded_messages()
+ #
+ # Records all messages in a log file while the context manager
+ # is active.
+ #
+ # In addition to automatically writing all messages to the
+ # specified logging file, an open file handle for process stdout
+ # and stderr will be available via the Messenger.get_log_handle() API,
+ # and the full logfile path will be available via the
+ # Messenger.get_log_filename() API.
+ #
+ # Args:
+ # filename (str): A logging directory relative filename,
+ # the pid and .log extension will be automatically
+ # appended
+ #
+ # logdir (str) : The path to the log file directory.
+ #
+ # Yields:
+ # (str): The fully qualified log filename
+ #
+ @contextmanager
+ def recorded_messages(self, filename, logdir):
+
+ # We dont allow recursing in this context manager, and
+ # we also do not allow it in the main process.
+ assert self._log_handle is None
+ assert self._log_filename is None
+ assert not utils._is_main_process()
+
+ # Create the fully qualified logfile in the log directory,
+ # appending the pid and .log extension at the end.
+ self._log_filename = os.path.join(logdir,
+ '{}.{}.log'.format(filename, os.getpid()))
+
+ # Ensure the directory exists first
+ directory = os.path.dirname(self._log_filename)
+ os.makedirs(directory, exist_ok=True)
+
+ with open(self._log_filename, 'a') as logfile:
+
+ # Write one last line to the log and flush it to disk
+ def flush_log():
+
+ # If the process currently had something happening in the I/O stack
+ # then trying to reenter the I/O stack will fire a runtime error.
+ #
+ # So just try to flush as well as we can at SIGTERM time
+ try:
+ logfile.write('\n\nForcefully terminated\n')
+ logfile.flush()
+ except RuntimeError:
+ os.fsync(logfile.fileno())
+
+ self._log_handle = logfile
+ with _signals.terminator(flush_log):
+ yield self._log_filename
+
+ self._log_handle = None
+ self._log_filename = None
+
+ # get_log_handle()
+ #
+ # Fetches the active log handle, this will return the active
+ # log file handle when the Messenger.recorded_messages() context
+ # manager is active
+ #
+ # Returns:
+ # (file): The active logging file handle, or None
+ #
+ def get_log_handle(self):
+ return self._log_handle
+
+ # get_log_filename()
+ #
+ # Fetches the active log filename, this will return the active
+ # log filename when the Messenger.recorded_messages() context
+ # manager is active
+ #
+ # Returns:
+ # (str): The active logging filename, or None
+ #
+ def get_log_filename(self):
+ return self._log_filename
+
+ # _record_message()
+ #
+ # Records the message if recording is enabled
+ #
+ # Args:
+ # message (Message): The message to record
+ #
+ def _record_message(self, message):
+
+ if self._log_handle is None:
+ return
+
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+ template = "[{timecode: <8}] {type: <7}"
+
+ # If this message is associated with a plugin, print what
+ # we know about the plugin.
+ plugin_name = ""
+ if message.unique_id:
+ template += " {plugin}"
+ plugin = Plugin._lookup(message.unique_id)
+ plugin_name = plugin.name
+
+ template += ": {message}"
+
+ detail = ''
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip('\n')
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ text = template.format(timecode=timecode,
+ plugin=plugin_name,
+ type=message.message_type.upper(),
+ message=message.message,
+ detail=detail)
+
+ # Write to the open log file
+ self._log_handle.write('{}\n'.format(text))
+ self._log_handle.flush()
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index 0758cf5ff..4352df56c 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -123,7 +123,7 @@ class Pipeline():
# targets (list of Element): The list of toplevel element targets
#
def resolve_elements(self, targets):
- with self._context.timed_activity("Resolving cached state", silent_nested=True):
+ with self._context.messenger.timed_activity("Resolving cached state", silent_nested=True):
for element in self.dependencies(targets, Scope.ALL):
# Preflight
@@ -355,7 +355,7 @@ class Pipeline():
def assert_consistent(self, elements):
inconsistent = []
inconsistent_workspaced = []
- with self._context.timed_activity("Checking sources"):
+ with self._context.messenger.timed_activity("Checking sources"):
for element in elements:
if element._get_consistency() == Consistency.INCONSISTENT:
if element._get_workspace():
@@ -391,7 +391,7 @@ class Pipeline():
#
def assert_sources_cached(self, elements):
uncached = []
- with self._context.timed_activity("Checking sources"):
+ with self._context.messenger.timed_activity("Checking sources"):
for element in elements:
if element._get_consistency() < Consistency.CACHED and \
not element._source_cached():
@@ -466,7 +466,7 @@ class Pipeline():
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
- self._context.message(
+ self._context.messenger.message(
Message(None, message_type, message, **args))
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 114d25054..5f433c090 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -445,10 +445,10 @@ class Project():
# (list): A list of loaded Element
#
def load_elements(self, targets, *, rewritable=False):
- with self._context.timed_activity("Loading elements", silent_nested=True):
+ with self._context.messenger.timed_activity("Loading elements", silent_nested=True):
meta_elements = self.loader.load(targets, rewritable=rewritable, ticker=None)
- with self._context.timed_activity("Resolving elements"):
+ with self._context.messenger.timed_activity("Resolving elements"):
elements = [
Element._new_from_meta(meta)
for meta in meta_elements
@@ -466,7 +466,7 @@ class Project():
for source, ref in redundant_refs
]
detail += "\n".join(lines)
- self._context.message(
+ self._context.messenger.message(
Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
return elements
@@ -694,7 +694,7 @@ class Project():
# Deprecation check
if fail_on_overlap is not None:
- self._context.message(
+ self._context.messenger.message(
Message(
None,
MessageType.WARN,
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 51531de64..1c2726b5f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -162,8 +162,9 @@ class Job():
self._parent_start_listening()
child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
- self._scheduler.context,
self.action_name,
+ self._scheduler.context.messenger,
+ self._scheduler.context.logdir,
self._logfile,
self._max_retries,
self._tries,
@@ -347,7 +348,7 @@ class Job():
if "unique_id" in kwargs:
unique_id = kwargs["unique_id"]
del kwargs["unique_id"]
- self._scheduler.context.message(
+ self._scheduler.context.messenger.message(
Message(unique_id, message_type, message, **kwargs))
#######################################################
@@ -470,7 +471,7 @@ class Job():
if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope.message)
+ self._scheduler.context.messenger.message(envelope.message)
elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
@@ -562,11 +563,12 @@ class Job():
class ChildJob():
def __init__(
- self, context, action_name, logfile, max_retries, tries, message_unique_id, task_id):
+ self, action_name, messenger, logdir, logfile, max_retries, tries, message_unique_id, task_id):
self.action_name = action_name
- self._context = context
+ self._messenger = messenger
+ self._logdir = logdir
self._logfile = logfile
self._max_retries = max_retries
self._tries = tries
@@ -592,7 +594,7 @@ class ChildJob():
if "unique_id" in kwargs:
unique_id = kwargs["unique_id"]
del kwargs["unique_id"]
- self._context.message(
+ self._messenger.message(
Message(unique_id, message_type, message, **kwargs))
# send_message()
@@ -673,7 +675,7 @@ class ChildJob():
# Set the global message handler in this child
# process to forward messages to the parent process
self._queue = queue
- self._context.set_message_handler(self._child_message_handler)
+ self._messenger.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()
stopped_time = None
@@ -690,7 +692,7 @@ class ChildJob():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- self._context.recorded_messages(self._logfile) as filename:
+ self._messenger.recorded_messages(self._logfile, self._logdir) as filename:
self.message(MessageType.START, self.action_name, logfile=filename)
@@ -828,16 +830,16 @@ class ChildJob():
# the message back to the parent process for further propagation.
#
# Args:
- # message (Message): The message to log
- # context (Context): The context object delegating this message
+ # message (Message): The message to log
+ # is_silenced (bool) : Whether messages are silenced
#
- def _child_message_handler(self, message, context):
+ def _child_message_handler(self, message, is_silenced):
message.action_name = self.action_name
message.task_id = self._task_id
# Send to frontend if appropriate
- if context.silent_messages() and (message.message_type not in unconditional_messages):
+ if is_silenced and (message.message_type not in unconditional_messages):
return
if message.message_type == MessageType.LOG:
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index f2cefd5d2..49a5381c1 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -332,7 +332,7 @@ class Queue():
def _message(self, element, message_type, brief, **kwargs):
context = element._get_context()
message = Message(element._unique_id, message_type, brief, **kwargs)
- context.message(message)
+ context.messenger.message(message)
def _element_log_path(self, element):
project = element._get_project()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 195be55ba..c6d748f91 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -617,7 +617,7 @@ class Stream():
# Prune the artifact cache
if ref_removed and not no_prune:
- with self._context.timed_activity("Pruning artifact cache"):
+ with self._context.messenger.timed_activity("Pruning artifact cache"):
self._artifacts.prune()
if not ref_removed:
@@ -789,8 +789,8 @@ class Stream():
# Remove workspace directory if prompted
if remove_dir:
- with self._context.timed_activity("Removing workspace directory {}"
- .format(workspace.get_absolute_path())):
+ with self._context.messenger.timed_activity("Removing workspace directory {}"
+ .format(workspace.get_absolute_path())):
try:
shutil.rmtree(workspace.get_absolute_path())
except OSError as e:
@@ -1195,7 +1195,7 @@ class Stream():
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
- self._context.message(
+ self._context.messenger.message(
Message(None, message_type, message, **args))
# _add_queue()
@@ -1434,7 +1434,7 @@ class Stream():
# Collect the sources in the given sandbox into a tarfile
def _collect_sources(self, directory, tar_name, element_name, compression):
- with self._context.timed_activity("Creating tarball {}".format(tar_name)):
+ with self._context.messenger.timed_activity("Creating tarball {}".format(tar_name)):
if compression == "none":
permissions = "w:"
else:
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index a12ff61ec..de969c267 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -657,10 +657,10 @@ class Plugin():
# This will raise SourceError on its own
self.call(... command which takes time ...)
"""
- with self.__context.timed_activity(activity_name,
- unique_id=self._unique_id,
- detail=detail,
- silent_nested=silent_nested):
+ with self.__context.messenger.timed_activity(activity_name,
+ unique_id=self._unique_id,
+ detail=detail,
+ silent_nested=silent_nested):
yield
def call(self, *popenargs, fail=None, fail_temporarily=False, **kwargs):
@@ -798,7 +798,7 @@ class Plugin():
#
@contextmanager
def _output_file(self):
- log = self.__context.get_log_handle()
+ log = self.__context.messenger.get_log_handle()
if log is None:
with open(os.devnull, "w") as output:
yield output
@@ -870,7 +870,7 @@ class Plugin():
def __message(self, message_type, brief, **kwargs):
message = Message(self._unique_id, message_type, brief, **kwargs)
- self.__context.message(message)
+ self.__context.messenger.message(message)
def __note_command(self, output, *popenargs, **kwargs):
workdir = kwargs.get('cwd', os.getcwd())
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index d90b164bc..075a69a2b 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -106,7 +106,7 @@ class SandboxRemote(Sandbox):
self.operation_name = None
def info(self, msg):
- self._get_context().message(Message(None, MessageType.INFO, msg))
+ self._get_context().messenger.message(Message(None, MessageType.INFO, msg))
@staticmethod
def specs_from_config_node(config_node, basedir=None):
@@ -226,8 +226,8 @@ class SandboxRemote(Sandbox):
# Set up signal handler to trigger cancel_operation on SIGTERM
operation = None
- with self._get_context().timed_activity("Waiting for the remote build to complete"), \
- _signals.terminator(partial(self.cancel_operation, channel)):
+ with self._get_context().messenger.timed_activity("Waiting for the remote build to complete"), \
+ _signals.terminator(partial(self.cancel_operation, channel)):
operation = __run_remote_command(stub, execute_request=request)
if operation is None:
return None
diff --git a/src/buildstream/sandbox/sandbox.py b/src/buildstream/sandbox/sandbox.py
index c96ccb57b..a651fb783 100644
--- a/src/buildstream/sandbox/sandbox.py
+++ b/src/buildstream/sandbox/sandbox.py
@@ -628,7 +628,7 @@ class _SandboxBatch():
def execute_group(self, group):
if group.label:
context = self.sandbox._get_context()
- cm = context.timed_activity(group.label, unique_id=self.sandbox._get_plugin_id())
+ cm = context.messenger.timed_activity(group.label, unique_id=self.sandbox._get_plugin_id())
else:
cm = contextlib.suppress()
@@ -640,7 +640,7 @@ class _SandboxBatch():
context = self.sandbox._get_context()
message = Message(self.sandbox._get_plugin_id(), MessageType.STATUS,
'Running command', detail=command.label)
- context.message(message)
+ context.messenger.message(message)
exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env)
if exitcode != 0:
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index 9fc9cf17d..b5c8f9a63 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -698,7 +698,7 @@ class Source(Plugin):
# Source consistency interrogations are silent.
context = self._get_context()
- with context.silence():
+ with context.messenger.silence():
self.__consistency = self.get_consistency() # pylint: disable=assignment-from-no-return
# Give the Source an opportunity to validate the cached
@@ -1150,7 +1150,7 @@ class Source(Plugin):
# Silence the STATUS messages which might happen as a result
# of checking the source fetchers.
- with context.silence():
+ with context.messenger.silence():
source_fetchers = self.get_source_fetchers()
# Use the source fetchers if they are provided
@@ -1165,7 +1165,7 @@ class Source(Plugin):
while True:
- with context.silence():
+ with context.messenger.silence():
try:
fetcher = next(source_fetchers)
except StopIteration:
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 0b57a6c9a..a4ea74633 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -24,7 +24,7 @@ DATA_DIR = os.path.join(
# Handle messages from the pipeline
-def message_handler(message, context):
+def message_handler(message, is_silenced):
pass
@@ -95,7 +95,7 @@ def test_pull(cli, tmpdir, datafiles):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project
project = Project(project_dir, context)
@@ -136,7 +136,7 @@ def _test_pull(user_config_file, project_dir, cache_dir,
context.cachedir = cache_dir
context.casdir = os.path.join(cache_dir, 'cas')
context.tmpdir = os.path.join(cache_dir, 'tmp')
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
@@ -198,7 +198,7 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project and CAS cache
project = Project(project_dir, context)
@@ -273,7 +273,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
@@ -308,7 +308,7 @@ def _test_pull_tree(user_config_file, project_dir, artifact_digest, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 862393f35..a54c1df09 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -23,7 +23,7 @@ DATA_DIR = os.path.join(
# Handle messages from the pipeline
-def message_handler(message, context):
+def message_handler(message, is_silenced):
pass
@@ -71,7 +71,7 @@ def test_push(cli, tmpdir, datafiles):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
@@ -108,7 +108,7 @@ def _test_push(user_config_file, project_dir, element_name, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
@@ -194,7 +194,7 @@ def _test_push_message(user_config_file, project_dir, queue):
# Fake minimal context
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
# Load the project manually
project = Project(project_dir, context)
diff --git a/tests/internals/loader.py b/tests/internals/loader.py
index 006b5787f..a4ebdb9ac 100644
--- a/tests/internals/loader.py
+++ b/tests/internals/loader.py
@@ -13,14 +13,14 @@ DATA_DIR = os.path.join(
)
-def dummy_handler(message, context):
+def dummy_handler(message, is_silenced):
pass
def make_loader(basedir):
context = Context()
context.load(config=os.devnull)
- context.set_message_handler(dummy_handler)
+ context.messenger.set_message_handler(dummy_handler)
project = Project(basedir, context)
return project.loader
diff --git a/tests/internals/pluginloading.py b/tests/internals/pluginloading.py
index 2d997077a..9093680f4 100644
--- a/tests/internals/pluginloading.py
+++ b/tests/internals/pluginloading.py
@@ -19,10 +19,10 @@ def create_pipeline(tmpdir, basedir, target):
context.casdir = os.path.join(str(tmpdir), 'cas')
project = Project(basedir, context)
- def dummy_handler(message, context):
+ def dummy_handler(message, is_silenced):
pass
- context.set_message_handler(dummy_handler)
+ context.messenger.set_message_handler(dummy_handler)
pipeline = Pipeline(context, project, None)
targets, = pipeline.load([(target,)])
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index cd8a3e989..3fc9d96a6 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -34,7 +34,7 @@ from tests.testutils import create_artifact_share
DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project")
-def message_handler(message, context):
+def message_handler(message, is_silenced):
pass
@@ -71,7 +71,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
project = Project(project_dir, context)
project.ensure_fully_loaded()
@@ -146,7 +146,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
project = Project(project_dir, context)
project.ensure_fully_loaded()
@@ -204,7 +204,7 @@ def test_pull_fail(cli, tmpdir, datafiles):
# get the source object
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
project = Project(project_dir, context)
project.ensure_fully_loaded()
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 4c652d21e..6282b6e60 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -35,7 +35,7 @@ from tests.testutils import create_artifact_share
DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project")
-def message_handler(message, context):
+def message_handler(message, is_silenced):
pass
@@ -72,7 +72,7 @@ def test_source_push(cli, tmpdir, datafiles):
# get the source object
context = Context()
context.load(config=user_config_file)
- context.set_message_handler(message_handler)
+ context.messenger.set_message_handler(message_handler)
project = Project(project_dir, context)
project.ensure_fully_loaded()
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 318285292..9dc431bda 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -35,7 +35,7 @@ from tests.testutils.element_generators import create_element_size
DATA_DIR = os.path.dirname(os.path.realpath(__file__))
-def dummy_message_handler(message, context):
+def dummy_message_handler(message, is_silenced):
pass
@@ -63,7 +63,7 @@ def test_source_staged(tmpdir, cli, datafiles):
project = Project(project_dir, context)
project.ensure_fully_loaded()
context.cachedir = cachedir
- context.set_message_handler(dummy_message_handler)
+ context.messenger.set_message_handler(dummy_message_handler)
sourcecache = context.sourcecache
cas = context.get_cascache()
@@ -106,7 +106,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
project = Project(project_dir, context)
project.ensure_fully_loaded()
context.cachedir = cachedir
- context.set_message_handler(dummy_message_handler)
+ context.messenger.set_message_handler(dummy_message_handler)
cas = context.get_cascache()
res = cli.run(project=project_dir, args=["source", "fetch", "import-dev.bst"])
@@ -148,7 +148,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
project = Project(project_dir, context)
project.ensure_fully_loaded()
context.cachedir = cachedir
- context.set_message_handler(dummy_message_handler)
+ context.messenger.set_message_handler(dummy_message_handler)
element = project.load_elements(["import-dev.bst"])[0]