summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-12-22 09:45:10 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-12-22 09:45:10 +0000
commit096c7292247c0aee4483b42f1d8da67c124dbcac (patch)
treed3c12e99b8765c1b54a055ac081ed3554aa4f65c
parentba5664fff47ad0e0a2614c1bf893ae5c31d747e7 (diff)
parentad4cbd31b23035bfe5a0b7af87c6fff8bfbef740 (diff)
downloadbuildstream-096c7292247c0aee4483b42f1d8da67c124dbcac.tar.gz
Merge branch 'tristan/messenger-convenience-functions' into 'master'
Added core facing convenience logging functions See merge request BuildStream/buildstream!2119
-rw-r--r--src/buildstream/_assetcache.py11
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py21
-rw-r--r--src/buildstream/_loader/loader.py4
-rw-r--r--src/buildstream/_messenger.py241
-rw-r--r--src/buildstream/_pipeline.py18
-rw-r--r--src/buildstream/_pluginfactory/pluginfactory.py5
-rw-r--r--src/buildstream/_project.py14
-rw-r--r--src/buildstream/_scheduler/scheduler.py4
-rw-r--r--src/buildstream/_stream.py39
-rw-r--r--src/buildstream/sandbox/_sandboxbuildboxrun.py7
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py7
-rw-r--r--src/buildstream/sandbox/sandbox.py9
-rw-r--r--tests/internals/cascache.py26
13 files changed, 216 insertions, 190 deletions
diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py
index a0b502f2b..fef597af6 100644
--- a/src/buildstream/_assetcache.py
+++ b/src/buildstream/_assetcache.py
@@ -25,7 +25,6 @@ import grpc
from . import utils
from . import _yaml
from ._cas import CASRemote
-from ._message import Message, MessageType
from ._exceptions import AssetCacheError, LoadError, RemoteError
from ._remote import BaseRemote, RemoteSpec, RemoteType
from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
@@ -573,14 +572,6 @@ class AssetCache:
return (index, storage)
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self.context.messenger.message(Message(message_type, message, **args))
-
# _set_remotes():
#
# Set the list of remote caches. If project is None, the global list of
@@ -604,7 +595,7 @@ class AssetCache:
#
def _initialize_remotes(self):
def remote_failed(remote, error):
- self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(remote.url, error))
+ self.context.messenger.warn("Failed to initialize remote {}: {}".format(remote.url, error))
with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 20ff610eb..11a16f2cb 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -37,7 +37,6 @@ from .._protos.google.bytestream import bytestream_pb2_grpc
from .. import _signals, utils
from .._exceptions import CASCacheError
-from .._message import Message, MessageType
_CASD_MAX_LOGFILES = 10
_CASD_TIMEOUT = 300 # in seconds
@@ -179,13 +178,8 @@ class CASDProcessManager:
# buildbox-casd is already dead
if messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
- return_code, self._logfile
- ),
- )
+ messenger.bug(
+ "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(return_code, self._logfile)
)
return
@@ -207,17 +201,12 @@ class CASDProcessManager:
self.process.wait(timeout=15)
if messenger:
- messenger.message(
- Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
- )
+ messenger.warn("Buildbox-casd didn't exit in time and has been killed")
return
if return_code != 0 and messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile),
- )
+ messenger.bug(
+ "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(return_code, self._logfile)
)
# create_channel():
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 54efd27ae..6ace3624b 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -33,7 +33,6 @@ from .types import Symbol
from . import loadelement
from .loadelement import LoadElement, Dependency, DependencyType, extract_depends_from_node
from ..types import CoreWarnings, _KeyStrength
-from .._message import Message, MessageType
# Loader():
@@ -1016,8 +1015,7 @@ class Loader:
if self.project._warning_is_fatal(warning_token):
raise LoadError(brief, warning_token)
- message = Message(MessageType.WARN, brief)
- self.load_context.context.messenger.message(message)
+ self.load_context.context.messenger.warn(brief)
# Print warning messages if any of the specified elements have invalid names.
#
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index f18d3dc92..3bd98cdbf 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -21,17 +21,19 @@ import os
import datetime
import threading
from contextlib import contextmanager
+from typing import Optional, Callable, Iterator, TextIO
from . import _signals
from ._exceptions import BstError
from ._message import Message, MessageType
+from ._state import State, Task
-_RENDER_INTERVAL = datetime.timedelta(seconds=1)
+_RENDER_INTERVAL: datetime.timedelta = datetime.timedelta(seconds=1)
# Time in seconds for which we decide that we want to display subtask information
-_DISPLAY_LIMIT = datetime.timedelta(seconds=3)
+_DISPLAY_LIMIT: datetime.timedelta = datetime.timedelta(seconds=3)
# If we're in the test suite, we need to ensure that we don't set a limit
if "BST_TEST_SUITE" in os.environ:
_DISPLAY_LIMIT = datetime.timedelta(seconds=0)
@@ -42,36 +44,65 @@ if "BST_TEST_SUITE" in os.environ:
class _TimeData:
__slots__ = ["start_time"]
- def __init__(self, start_time):
- self.start_time = start_time
+ def __init__(self, start_time: datetime.datetime) -> None:
+ self.start_time: datetime.datetime = start_time
+# _MessengerLocal
+#
+# Thread local storage for the messenger
+#
+class _MessengerLocal(threading.local):
+ def __init__(self) -> None:
+ super().__init__()
+
+ # The callback to call when propagating messages
+ #
+ # FIXME: The message handler is currently not strongly typed,
+ # as it uses a kwarg, we cannot declare it with Callable.
+ # We can use `Protocol` to strongly type this with python >= 3.8
+ self.message_handler = None
+
+ # The open file handle for this task
+ self.log_handle: Optional[TextIO] = None
+
+ # The filename for this task
+ self.log_filename: Optional[str] = None
+
+ # Level of silent messages depth in this task
+ self.silence_scope_depth: int = 0
+
+
+# Messenger()
+#
+# The messenger object.
+#
+# This is used to propagate messages either from the main context or
+# from task contexts in such a way that messages are propagated to
+# the frontend and also optionally recorded to a task log file when
+# the message is issued from a task context.
+#
class Messenger:
- def __init__(self):
- self._state = None
- self._next_render = None # A Time object
- self._active_simple_tasks = 0
- self._render_status_cb = None
-
- self._locals = threading.local()
- self._locals.message_handler = None
- self._locals.log_handle = None
- self._locals.log_filename = None
- self._locals.silence_scope_depth = 0
+ def __init__(self) -> None:
+ self._state: Optional[State] = None # The State object
+
+ #
+ # State related to simple tasks, these drive the status bar
+ # when ongoing activities occur outside of an active scheduler
+ #
+ self._active_simple_tasks: int = 0 # Number of active simple tasks
+ self._next_render: Optional[datetime.datetime] = None # The time of the next render
+ self._render_status_cb: Optional[Callable[[], None]] = None # The render callback
+
+ # Thread local storage
+ self._locals: _MessengerLocal = _MessengerLocal()
# set_message_handler()
#
# Sets the handler for any status messages propagated through
- # the context.
+ # the messenger.
#
- # The handler should have the signature:
- #
- # def handler(
- # message: _message.Message, # The message to send.
- # is_silenced: bool, # Whether messages are currently being silenced.
- # ) -> None
- #
- def set_message_handler(self, handler):
+ def set_message_handler(self, handler) -> None:
self._locals.message_handler = handler
# set_state()
@@ -79,9 +110,9 @@ class Messenger:
# Sets the State object within the Messenger
#
# Args:
- # state (State): The state to set
+ # state: The state to set
#
- def set_state(self, state):
+ def set_state(self, state: State) -> None:
self._state = state
# set_render_status_cb()
@@ -89,22 +120,11 @@ class Messenger:
# Sets the callback to use to render status
#
# Args:
- # callback (function): The Callback to be notified
+ # callback: The Callback to be notified
#
- # Callback Args:
- # There are no arguments to the callback
- #
- def set_render_status_cb(self, callback):
+ def set_render_status_cb(self, callback: Callable[[], None]) -> None:
self._render_status_cb = callback
- # _silent_messages():
- #
- # Returns:
- # (bool): Whether messages are currently being silenced
- #
- def _silent_messages(self):
- return self._locals.silence_scope_depth > 0
-
# message():
#
# Proxies a message back to the caller, this is the central
@@ -113,7 +133,7 @@ class Messenger:
# Args:
# message: A Message object
#
- def message(self, message):
+ def message(self, message: Message) -> None:
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
@@ -124,6 +144,71 @@ class Messenger:
self._locals.message_handler(message, is_silenced=self._silent_messages())
+ # status():
+ #
+ # A core facing convenience method for issuing STATUS messages
+ #
+ # Args:
+ # brief: The brief status message
+ # detail: An optional detailed message
+ # kwargs: Additional Message constructor keyword arguments
+ #
+ def status(self, brief: str, *, detail: Optional[str] = None, **kwargs) -> None:
+ message = Message(MessageType.STATUS, brief, detail=detail, **kwargs)
+ self.message(message)
+
+ # info():
+ #
+ # A core facing convenience method for issuing INFO messages
+ #
+ # Args:
+ # brief: The brief info message
+ # detail: An optional detailed message
+ # kwargs: Additional Message constructor keyword arguments
+ #
+ def info(self, brief: str, *, detail: Optional[str] = None, **kwargs) -> None:
+ message = Message(MessageType.INFO, brief, detail=detail, **kwargs)
+ self.message(message)
+
+ # warn():
+ #
+ # A core facing convenience method for issuing WARN messages
+ #
+ # Args:
+ # brief: The brief warning message
+ # detail: An optional detailed message
+ # kwargs: Additional Message constructor keyword arguments
+ #
+ def warn(self, brief: str, *, detail: Optional[str] = None, **kwargs) -> None:
+ message = Message(MessageType.WARN, brief, detail=detail, **kwargs)
+ self.message(message)
+
+ # error():
+ #
+ # A core facing convenience method for issuing ERROR messages
+ #
+ # Args:
+ # brief: The brief error message
+ # detail: An optional detailed message
+ # kwargs: Additional Message constructor keyword arguments
+ #
+ def error(self, brief: str, *, detail: Optional[str] = None, **kwargs) -> None:
+ message = Message(MessageType.ERROR, brief, detail=detail, **kwargs)
+ self.message(message)
+
+ # bug():
+ #
+ # A core facing convenience method for issuing BUG messages
+ #
+ # Args:
+ # brief: The brief bug message
+ # detail: An optional detailed message
+ # kwargs: Additional Message constructor keyword arguments
+ #
+ def bug(self, brief: str, *, detail: Optional[str] = None, **kwargs) -> None:
+ message = Message(MessageType.BUG, brief, detail=detail, **kwargs)
+ self.message(message)
+
# silence()
#
# A context manager to silence messages, this behaves in
@@ -132,19 +217,19 @@ class Messenger:
# _message.unconditional_messages will be silenced.
#
# Args:
- # actually_silence (bool): Whether to actually do the silencing, if
- # False then this context manager does not
- # affect anything.
+ # actually_silence: Whether to actually do the silencing, if
+ # False then this context manager does not
+ # affect anything.
#
@contextmanager
- def silence(self, *, actually_silence=True):
+ def silence(self, *, actually_silence: bool = True) -> Iterator[None]:
if not actually_silence:
- yield
+ yield None
return
self._locals.silence_scope_depth += 1
try:
- yield
+ yield None
finally:
assert self._locals.silence_scope_depth > 0
self._locals.silence_scope_depth -= 1
@@ -154,20 +239,22 @@ class Messenger:
# Context manager for performing timed activities and logging those
#
# Args:
- # activity_name (str): The name of the activity
- # detail (str): An optional detailed message, can be multiline output
- # silent_nested (bool): If True, all nested messages are silenced except for unconditionaly ones
+ # activity_name: The name of the activity
+ # detail: An optional detailed message, can be multiline output
+ # silent_nested: If True, all nested messages are silenced except for unconditionaly ones
# kwargs: Remaining Message() constructor keyword arguments.
#
@contextmanager
- def timed_activity(self, activity_name, *, detail=None, silent_nested=False, **kwargs):
+ def timed_activity(
+ self, activity_name: str, *, detail: str = None, silent_nested: bool = False, **kwargs
+ ) -> Iterator[None]:
with self.timed_suspendable() as timedata:
try:
# Push activity depth for status messages
message = Message(MessageType.START, activity_name, detail=detail, **kwargs)
self.message(message)
with self.silence(actually_silence=silent_nested):
- yield
+ yield None
except BstError:
# Note the failure in status messages and reraise, the scheduler
@@ -186,21 +273,23 @@ class Messenger:
# Context manager for creating a task to report progress to.
#
# Args:
- # activity_name (str): The name of the activity
- # task_name (str): Optionally, the task name for the frontend during this task
- # detail (str): An optional detailed message, can be multiline output
- # silent_nested (bool): If True, all nested messages are silenced except for unconditionaly ones
+ # activity_name: The name of the activity
+ # task_name: Optionally, the task name for the frontend during this task
+ # detail: An optional detailed message, can be multiline output
+ # silent_nested: If True, all nested messages are silenced except for unconditionaly ones
# kwargs: Remaining Message() constructor keyword arguments.
#
# Yields:
# Task: A Task object that represents this activity, principally used to report progress
#
@contextmanager
- def simple_task(self, activity_name, *, task_name=None, detail=None, silent_nested=False, **kwargs):
+ def simple_task(
+ self, activity_name: str, *, task_name: str = None, detail: str = None, silent_nested: bool = False, **kwargs
+ ) -> Iterator[Optional[Task]]:
# Bypass use of State when none exists (e.g. tests)
if not self._state:
with self.timed_activity(activity_name, detail=detail, silent_nested=silent_nested, **kwargs):
- yield
+ yield None
return
if not task_name:
@@ -254,17 +343,17 @@ class Messenger:
# Messenger.get_log_filename() API.
#
# Args:
- # filename (str): A logging directory relative filename,
- # the pid and .log extension will be automatically
- # appended
+ # filename: A logging directory relative filename,
+ # the pid and .log extension will be automatically
+ # appended
#
- # logdir (str) : The path to the log file directory.
+ # logdir: The path to the log file directory.
#
# Yields:
- # (str): The fully qualified log filename
+ # The fully qualified log filename
#
@contextmanager
- def recorded_messages(self, filename, logdir):
+ def recorded_messages(self, filename: str, logdir: str) -> Iterator[str]:
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
@@ -308,9 +397,9 @@ class Messenger:
# manager is active
#
# Returns:
- # (file): The active logging file handle, or None
+ # The active logging file handle, or None
#
- def get_log_handle(self):
+ def get_log_handle(self) -> Optional[TextIO]:
return self._locals.log_handle
# get_log_filename()
@@ -320,9 +409,9 @@ class Messenger:
# manager is active
#
# Returns:
- # (str): The active logging filename, or None
+ # The active logging filename, or None
#
- def get_log_filename(self):
+ def get_log_filename(self) -> Optional[str]:
return self._locals.log_filename
# timed_suspendable()
@@ -331,10 +420,10 @@ class Messenger:
# adjust for clock drift caused by suspending
#
# Yields:
- # TimeData: An object that contains the time the activity started
+ # An object that contains the time the activity started
#
@contextmanager
- def timed_suspendable(self):
+ def timed_suspendable(self) -> Iterator[_TimeData]:
# Note: timedata needs to be in a namedtuple so that values can be
# yielded that will change
timedata = _TimeData(start_time=datetime.datetime.now())
@@ -351,14 +440,22 @@ class Messenger:
with _signals.suspendable(stop_time, resume_time):
yield timedata
+ # _silent_messages():
+ #
+ # Returns:
+ # (bool): Whether messages are currently being silenced
+ #
+ def _silent_messages(self) -> bool:
+ return self._locals.silence_scope_depth > 0
+
# _record_message()
#
# Records the message if recording is enabled
#
# Args:
- # message (Message): The message to record
+ # message: The message to record
#
- def _record_message(self, message):
+ def _record_message(self, message: Message) -> None:
if self._locals.log_handle is None:
return
@@ -411,7 +508,7 @@ class Messenger:
# Calls the render status callback set in the messenger, but only if a
# second has passed since it last rendered.
#
- def _render_status(self):
+ def _render_status(self) -> None:
assert self._next_render
# self._render_status_cb()
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index 01ebc2e96..d53fc9d01 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -27,7 +27,6 @@ from collections import OrderedDict
from pyroaring import BitMap # pylint: disable=no-name-in-module
from ._exceptions import PipelineError
-from ._message import Message, MessageType
from ._profile import Topics, PROFILER
from ._project import ProjectRefStorage
from .types import _PipelineSelection, _Scope
@@ -42,14 +41,9 @@ from .types import _PipelineSelection, _Scope
#
class Pipeline:
def __init__(self, context, project, artifacts):
-
self._context = context # The Context
self._project = project # The toplevel project
-
- #
- # Private members
- #
- self._artifacts = artifacts
+ self._artifacts = artifacts # The artifact cache
# load()
#
@@ -184,7 +178,7 @@ class Pipeline:
for t in targets:
new_elm = t._get_source_element()
if new_elm != t and not silent:
- self._message(MessageType.INFO, "Element '{}' redirected to '{}'".format(t.name, new_elm.name))
+ self._context.messenger.info("Element '{}' redirected to '{}'".format(t.name, new_elm.name))
if new_elm not in elements:
elements.append(new_elm)
return elements
@@ -423,14 +417,6 @@ class Pipeline:
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self._context.messenger.message(Message(message_type, message, **args))
-
# _Planner()
#
diff --git a/src/buildstream/_pluginfactory/pluginfactory.py b/src/buildstream/_pluginfactory/pluginfactory.py
index fb5389b3e..a922a629c 100644
--- a/src/buildstream/_pluginfactory/pluginfactory.py
+++ b/src/buildstream/_pluginfactory/pluginfactory.py
@@ -30,7 +30,6 @@ from ..node import Node
from ..utils import UtilError
from .._exceptions import PluginError
from .._messenger import Messenger
-from .._message import Message, MessageType
from .pluginorigin import PluginOrigin, PluginType
@@ -147,12 +146,10 @@ class PluginFactory:
# plugin is used.
#
if plugin_type.BST_PLUGIN_DEPRECATED and not self._allow_deprecated[kind]:
- message = Message(
- MessageType.WARN,
+ messenger.warn(
"{}: Using deprecated plugin '{}'".format(provenance_node.get_provenance(), kind),
detail=plugin_type.BST_PLUGIN_DEPRECATION_MESSAGE,
)
- messenger.message(message)
return plugin_type, defaults
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 2534e0209..0bd398429 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -42,7 +42,6 @@ from .types import CoreWarnings
from ._projectrefs import ProjectRefs, ProjectRefStorage
from ._loader import Loader, LoadContext
from .element import Element
-from ._message import Message, MessageType
from ._includes import Includes
from ._workspaces import WORKSPACE_PROJECT_FILE
@@ -474,9 +473,7 @@ class Project:
detail = "The following inline specified source references will be ignored:\n\n"
lines = ["{}:{}".format(source._get_provenance(), ref) for source, ref in redundant_refs]
detail += "\n".join(lines)
- self._context.messenger.message(
- Message(MessageType.WARN, "Ignoring redundant source references", detail=detail)
- )
+ self._context.messenger.warn("Ignoring redundant source references", detail=detail)
return elements
@@ -909,12 +906,9 @@ class Project:
# Deprecation check
if not fail_on_overlap.is_none():
- self._context.messenger.message(
- Message(
- MessageType.WARN,
- "Use of fail-on-overlap within project.conf "
- + "is deprecated. Consider using fatal-warnings instead.",
- )
+ self._context.messenger.warn(
+ "Use of fail-on-overlap within project.conf "
+ + "is deprecated. Consider using fatal-warnings instead.",
)
if (CoreWarnings.OVERLAPS not in self._fatal_warnings) and fail_on_overlap.as_bool():
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 23abbe46d..3f1cb8ecb 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -35,7 +35,6 @@ from .resources import Resources
from .jobs import JobStatus
from ..types import FastEnum
from .._profile import Topics, PROFILER
-from .._message import Message, MessageType
from ..plugin import Plugin
from .. import _signals
@@ -319,8 +318,7 @@ class Scheduler:
# returncode (int): the return code with which buildbox-casd exited
#
def _abort_on_casd_failure(self, pid, returncode):
- message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
- self.context.messenger.message(message)
+ self.context.messenger.bug("buildbox-casd died while the pipeline was active.")
self._casd_process.returncode = returncode
self.terminate()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e91ee882c..e05100f24 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -33,7 +33,6 @@ from typing import List, Tuple
from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._artifactproject import ArtifactProject
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
-from ._message import Message, MessageType
from ._scheduler import (
Scheduler,
SchedStatus,
@@ -241,7 +240,7 @@ class Stream:
# Raise warning if the element is cached in a failed state
if element._cached_failure():
- self._message(MessageType.WARN, "using a buildtree from a failed build.")
+ self._context.messenger.warn("using a buildtree from a failed build.")
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
@@ -542,7 +541,7 @@ class Stream:
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
- self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact")
+ self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(uncached_elts)
@@ -641,10 +640,10 @@ class Stream:
for obj in target_objects:
ref = obj.get_artifact_name()
if not obj._cached():
- self._message(MessageType.WARN, "{} is not cached".format(ref))
+ self._context.messenger.warn("{} is not cached".format(ref))
continue
if not obj._cached_logs():
- self._message(MessageType.WARN, "{} is cached without log files".format(ref))
+ self._context.messenger.warn("{} is cached without log files".format(ref))
continue
artifact_logs[obj.name] = obj.get_logs()
@@ -669,7 +668,7 @@ class Stream:
for obj in target_objects:
ref = obj.get_artifact_name()
if not obj._cached():
- self._message(MessageType.WARN, "{} is not cached".format(ref))
+ self._context.messenger.warn("{} is not cached".format(ref))
obj.name = {ref: "No artifact cached"}
continue
if isinstance(obj, ArtifactElement):
@@ -702,14 +701,14 @@ class Stream:
try:
self._artifacts.remove(ref)
except ArtifactError as e:
- self._message(MessageType.WARN, str(e))
+ self._context.messenger.warn(str(e))
continue
- self._message(MessageType.INFO, "Removed: {}".format(ref))
+ self._context.messenger.info("Removed: {}".format(ref))
ref_removed = True
if not ref_removed:
- self._message(MessageType.INFO, "No artifacts were removed")
+ self._context.messenger.info("No artifacts were removed")
# source_checkout()
#
@@ -754,7 +753,7 @@ class Stream:
"Error while writing sources" ": '{}'".format(e), detail=e.detail, reason=e.reason
) from e
- self._message(MessageType.INFO, "Checked out sources to '{}'".format(location))
+ self._context.messenger.info("Checked out sources to '{}'".format(location))
# workspace_open
#
@@ -849,7 +848,7 @@ class Stream:
# Now it does the bits that can not be made atomic.
targetGenerator = zip(elements, expanded_directories)
for target, directory in targetGenerator:
- self._message(MessageType.INFO, "Creating workspace for element {}".format(target.name))
+ self._context.messenger.info("Creating workspace for element {}".format(target.name))
workspace = workspaces.get_workspace(target._get_full_name())
if workspace and not no_checkout:
@@ -866,7 +865,7 @@ class Stream:
raise StreamError("Failed to create workspace directory: {}".format(e) + todo_elements) from e
workspaces.create_workspace(target, directory, checkout=not no_checkout)
- self._message(MessageType.INFO, "Created a workspace for element: {}".format(target._get_full_name()))
+ self._context.messenger.info("Created a workspace for element: {}".format(target._get_full_name()))
# workspace_close
#
@@ -893,7 +892,7 @@ class Stream:
# Delete the workspace and save the configuration
workspaces.delete_workspace(element_name)
workspaces.save_config()
- self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
+ self._context.messenger.info("Closed workspace for {}".format(element_name))
# workspace_reset
#
@@ -922,8 +921,8 @@ class Stream:
if soft:
workspace.last_build = None
- self._message(
- MessageType.INFO, "Reset workspace state for {} at: {}".format(element.name, workspace_path)
+ self._context.messenger.info(
+ "Reset workspace state for {} at: {}".format(element.name, workspace_path)
)
continue
@@ -1332,14 +1331,6 @@ class Stream:
return selected
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self._context.messenger.message(Message(message_type, message, **args))
-
# _add_queue()
#
# Adds a queue to the stream
@@ -1691,7 +1682,7 @@ class Stream:
)
else:
message = "No elements matched the following glob expression(s): {}".format(", ".join(unmatched))
- self._message(MessageType.WARN, message)
+ self._context.messenger.warn(message)
if doubly_matched:
raise StreamError(
diff --git a/src/buildstream/sandbox/_sandboxbuildboxrun.py b/src/buildstream/sandbox/_sandboxbuildboxrun.py
index 1c187d7fd..e489c9b4f 100644
--- a/src/buildstream/sandbox/_sandboxbuildboxrun.py
+++ b/src/buildstream/sandbox/_sandboxbuildboxrun.py
@@ -25,7 +25,6 @@ import psutil
from .. import utils, _signals
from . import SandboxFlags
from .._exceptions import SandboxError
-from .._message import Message, MessageType
from .._platform import Platform
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from ._sandboxreapi import SandboxREAPI
@@ -115,7 +114,8 @@ class SandboxBuildBoxRun(SandboxREAPI):
continue
if "bind-mount" not in self._capabilities:
- self._warn("buildbox-run does not support host-files")
+ context = self._get_context()
+ context.messenger.warn("buildbox-run does not support host-files")
break
buildbox_command.append("--bind-mount={}:{}".format(mount_source, mount_point))
@@ -220,6 +220,3 @@ class SandboxBuildBoxRun(SandboxREAPI):
def _supported_platform_properties(self):
return {"OSFamily", "ISA", "unixUID", "unixGID", "network"}
-
- def _warn(self, msg):
- self._get_context().messenger.message(Message(MessageType.WARN, msg))
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index ff314adba..1174b2769 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -27,7 +27,6 @@ from functools import partial
import grpc
from ..node import Node
-from .._message import Message, MessageType
from ._sandboxreapi import SandboxREAPI
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
@@ -105,9 +104,6 @@ class SandboxRemote(SandboxREAPI):
)
self.operation_name = None
- def info(self, msg):
- self._get_context().messenger.message(Message(MessageType.INFO, msg, element_name=self._get_element_name()))
-
@staticmethod
def specs_from_config_node(config_node, basedir=None):
def require_node(config, keyname):
@@ -423,7 +419,8 @@ class SandboxRemote(SandboxREAPI):
raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details()))
return None
else:
- self.info("Action result found in action cache")
+ context = self._get_context()
+ context.messenger.info("Action result found in action cache", element_name=self._get_element_name())
return result
@staticmethod
diff --git a/src/buildstream/sandbox/sandbox.py b/src/buildstream/sandbox/sandbox.py
index eb0a705d7..26cd0c618 100644
--- a/src/buildstream/sandbox/sandbox.py
+++ b/src/buildstream/sandbox/sandbox.py
@@ -36,7 +36,6 @@ from contextlib import contextmanager
from typing import Dict, Generator, List, Optional, TYPE_CHECKING
from .._exceptions import ImplError, SandboxError
-from .._message import Message, MessageType
from ..storage.directory import Directory
from ..storage._casbaseddirectory import CasBasedDirectory
@@ -577,13 +576,9 @@ class _SandboxBatch:
def execute_command(self, command):
if command.label:
context = self.sandbox._get_context()
- message = Message(
- MessageType.STATUS,
- "Running command",
- detail=command.label,
- element_name=self.sandbox._get_element_name(),
+ context.messenger.status(
+ "Running command", detail=command.label, element_name=self.sandbox._get_element_name(),
)
- context.messenger.message(message)
exitcode = self.sandbox._run(command.command, self.flags, cwd=command.cwd, env=command.env)
if exitcode != 0:
diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py
index e27e40974..046a14bbe 100644
--- a/tests/internals/cascache.py
+++ b/tests/internals/cascache.py
@@ -4,7 +4,6 @@ from unittest.mock import MagicMock
from buildstream._cas.cascache import CASCache
from buildstream._cas import casdprocessmanager
-from buildstream._message import MessageType
from buildstream._messenger import Messenger
@@ -19,12 +18,11 @@ def test_report_when_cascache_dies_before_asked_to(tmp_path, monkeypatch):
time.sleep(1)
cache.release_resources(messenger)
- assert messenger.message.call_count == 1
+ assert messenger.bug.call_count == 1
- message = messenger.message.call_args[0][0]
- assert message.message_type == MessageType.BUG
- assert "0" in message.message
- assert "died" in message.message
+ message = messenger.bug.call_args[0][0]
+ assert "0" in message
+ assert "died" in message
def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch):
@@ -42,12 +40,11 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch):
time.sleep(1)
cache.release_resources(messenger)
- assert messenger.message.call_count == 1
+ assert messenger.bug.call_count == 1
- message = messenger.message.call_args[0][0]
- assert message.message_type == MessageType.BUG
- assert "-15" in message.message
- assert "cleanly" in message.message
+ message = messenger.bug.call_args[0][0]
+ assert "-15" in message
+ assert "cleanly" in message
def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch):
@@ -65,11 +62,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch):
time.sleep(1)
cache.release_resources(messenger)
- assert messenger.message.call_count == 1
+ assert messenger.warn.call_count == 1
- message = messenger.message.call_args[0][0]
- assert message.message_type == MessageType.WARN
- assert "killed" in message.message
+ message = messenger.warn.call_args[0][0]
+ assert "killed" in message
def test_casd_redirects_stderr_to_file_and_rotate(tmp_path, monkeypatch):