summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2018-07-03 16:43:47 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2018-10-31 14:55:58 +0000
commita350b438e5d29eb16408228af10daf39e0989c59 (patch)
treeb04dd16444d5be3de4baa2d703011755b0dc813d
parentc7ac7e7d70e9a0a266c935505696c23f8c23f244 (diff)
downloadbuildstream-Qinusty/message-helpers.tar.gz
Overhaul internal messaging APIQinusty/message-helpers
-rw-r--r--buildstream/_artifactcache/artifactcache.py12
-rw-r--r--buildstream/_context.py106
-rw-r--r--buildstream/_frontend/app.py22
-rw-r--r--buildstream/_pipeline.py14
-rw-r--r--buildstream/_project.py14
-rw-r--r--buildstream/_scheduler/jobs/elementjob.py23
-rw-r--r--buildstream/_scheduler/jobs/job.py90
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py8
-rw-r--r--buildstream/_scheduler/queues/queue.py39
-rw-r--r--buildstream/_stream.py33
-rw-r--r--buildstream/plugin.py18
11 files changed, 209 insertions, 170 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 8ea6c9dc2..956ed84c3 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -24,7 +24,6 @@ from collections.abc import Mapping
from ..types import _KeyStrength
from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
-from .._message import Message, MessageType
from .. import utils
from .. import _yaml
@@ -589,15 +588,6 @@ class ArtifactCache():
# Local Private Methods #
################################################
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self.context.message(
- Message(None, message_type, message, **args))
-
# _set_remotes():
#
# Set the list of remote caches. If project is None, the global list of
@@ -621,7 +611,7 @@ class ArtifactCache():
#
def _initialize_remotes(self):
def remote_failed(url, error):
- self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
+ self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
diff --git a/buildstream/_context.py b/buildstream/_context.py
index e3c290b7b..5bfd897ea 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -27,6 +27,7 @@ from . import _cachekey
from . import _signals
from . import _site
from . import _yaml
+from .plugin import Plugin
from ._exceptions import LoadError, LoadErrorReason, BstError
from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
@@ -326,7 +327,7 @@ class Context():
# the context.
#
# The message handler should have the same signature as
- # the message() method
+ # the _send_message() method
def set_message_handler(self, handler):
self._message_handler = handler
@@ -341,15 +342,19 @@ class Context():
return True
return False
- # message():
+ # _send_message():
#
- # Proxies a message back to the caller, this is the central
+ # Proxies a message back through the message handler, this is the central
# point through which all messages pass.
#
# Args:
# message: A Message object
#
- def message(self, message):
+ def _send_message(self, message):
+ # Debug messages should only be displayed when they are
+ # configured to be
+ if not self.log_debug and message.message_type == MessageType.DEBUG:
+ return
# Tag message only once
if message.depth is None:
@@ -365,6 +370,86 @@ class Context():
self._message_handler(message, context=self)
+ # message():
+ #
+ # The global message API. Any message-sending functions should go
+ # through here. This will call `_send_message` to deliver the
+ # final message.
+ #
+ # Args:
+ # text (str): The text of the message.
+ #
+ # Kwargs:
+ # msg_type (MessageType): The type of the message (required).
+ # plugin (Plugin|str|None): The id of the plugin
+ # (i.e. Element, Source subclass
+ # instance) sending the message. If
+ # a plugin is given, this will be
+ # determined automatically, if
+ # omitted the message will be sent
+ # without a plugin context.
+ #
+ # For other kwargs, see `Message`.
+ #
+ def message(self, text, *, plugin=None, msg_type=None, **kwargs):
+ assert msg_type is not None
+
+ if isinstance(plugin, Plugin):
+ plugin_id = plugin._get_unique_id()
+ else:
+ plugin_id = plugin
+
+ self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))
+
+ # skipped():
+ #
+ # Produce and send a skipped message through the context.
+ #
+ def skipped(self, text, **kwargs):
+ self.message(text, msg_type=MessageType.SKIPPED, **kwargs)
+
+ # debug():
+ #
+ # Produce and send a debug message through the context.
+ #
+ def debug(self, text, **kwargs):
+ self.message(text, msg_type=MessageType.DEBUG, **kwargs)
+
+ # status():
+ #
+ # Produce and send a status message through the context.
+ #
+ def status(self, text, **kwargs):
+ self.message(text, msg_type=MessageType.STATUS, **kwargs)
+
+ # info():
+ #
+ # Produce and send a info message through the context.
+ #
+ def info(self, text, **kwargs):
+ self.message(text, msg_type=MessageType.INFO, **kwargs)
+
+ # warn():
+ #
+ # Produce and send a warning message through the context.
+ #
+ def warn(self, text, **kwargs):
+ self.message(text, msg_type=MessageType.WARN, **kwargs)
+
+ # error():
+ #
+ # Produce and send a error message through the context.
+ #
+ def error(self, text, **kwargs):
+ self.message(text, msg_type=MessageType.ERROR, **kwargs)
+
+ # log():
+ #
+ # Produce and send a log message through the context.
+ #
+ def log(self, text, **kwargs):
+ self.message(text, msg_type=MessageType.LOG, **kwargs)
+
# silence()
#
# A context manager to silence messages, this behaves in
@@ -409,8 +494,8 @@ class Context():
with _signals.suspendable(stop_time, resume_time):
try:
# Push activity depth for status messages
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
- self.message(message)
+ self.message(activity_name, detail=detail, plugin=unique_id,
+ msg_type=MessageType.START)
self._push_message_depth(silent_nested)
yield
@@ -418,15 +503,16 @@ class Context():
# Note the failure in status messages and reraise, the scheduler
# expects an error when there is an error.
elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
self._pop_message_depth()
- self.message(message)
+ self.message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
+ msg_type=MessageType.FAIL)
raise
elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
self._pop_message_depth()
- self.message(message)
+ self.message(activity_name, detail=detail,
+ elapsed=elapsed, plugin=unique_id,
+ msg_type=MessageType.SUCCESS)
# recorded_messages()
#
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 87db8076a..85e7edb70 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -34,7 +34,7 @@ from .._context import Context
from .._platform import Platform
from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
-from .._message import Message, MessageType, unconditional_messages
+from .._message import MessageType, unconditional_messages
from .._stream import Stream
from .._versions import BST_FORMAT_VERSION
from .. import _yaml
@@ -250,7 +250,7 @@ class App():
# Mark the beginning of the session
if session_name:
- self._message(MessageType.START, session_name)
+ self.context.message(session_name, msg_type=MessageType.START)
# Run the body of the session here, once everything is loaded
try:
@@ -262,9 +262,9 @@ class App():
elapsed = self.stream.elapsed_time
if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
- self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
+ self.context.warn(session_name + ' Terminated', elapsed=elapsed)
else:
- self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+ self.context.message(session_name, elapsed=elapsed, msg_type=MessageType.FAIL)
# Notify session failure
self._notify("{} failed".format(session_name), "{}".format(e))
@@ -282,7 +282,9 @@ class App():
else:
# No exceptions occurred, print session time and summary
if session_name:
- self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+ self.context.message(session_name,
+ elapsed=self.stream.elapsed_time,
+ msg_type=MessageType.SUCCESS)
if self._started:
self._print_summary()
@@ -428,21 +430,13 @@ class App():
if self.interactive:
self.notify(title, text)
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self.context.message(
- Message(None, message_type, message, **args))
-
# Exception handler
#
def _global_exception_handler(self, etype, value, tb):
# Print the regular BUG message
formatted = "".join(traceback.format_exception(etype, value, tb))
- self._message(MessageType.BUG, str(value),
- detail=formatted)
+ self.context.message(value, detail=formatted, msg_type=MessageType.BUG)
# If the scheduler has started, try to terminate all jobs gracefully,
# otherwise exit immediately.
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 1f75b2e9e..c57c54f50 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -24,7 +24,6 @@ import itertools
from operator import itemgetter
from ._exceptions import PipelineError
-from ._message import Message, MessageType
from ._profile import Topics, profile_start, profile_end
from . import Scope, Consistency
from ._project import ProjectRefStorage
@@ -201,8 +200,8 @@ class Pipeline():
for t in targets:
new_elm = t._get_source_element()
if new_elm != t and not silent:
- self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
- .format(t.name, new_elm.name))
+ self._context.info("Element '{}' redirected to '{}'"
+ .format(t.name, new_elm.name))
if new_elm not in elements:
elements.append(new_elm)
elif mode == PipelineSelection.PLAN:
@@ -433,15 +432,6 @@ class Pipeline():
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self._context.message(
- Message(None, message_type, message, **args))
-
# _Planner()
#
diff --git a/buildstream/_project.py b/buildstream/_project.py
index 83aa1f47e..7126cf780 100644
--- a/buildstream/_project.py
+++ b/buildstream/_project.py
@@ -37,7 +37,6 @@ from ._projectrefs import ProjectRefs, ProjectRefStorage
from ._versions import BST_FORMAT_VERSION
from ._loader import Loader
from .element import Element
-from ._message import Message, MessageType
from ._includes import Includes
from ._platform import Platform
@@ -337,8 +336,7 @@ class Project():
for source, ref in redundant_refs
]
detail += "\n".join(lines)
- self._context.message(
- Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
+ self._context.warn("Ignoring redundant source references", detail=detail)
return elements
@@ -514,13 +512,9 @@ class Project():
# Deprecation check
if fail_on_overlap is not None:
- self._context.message(
- Message(
- None,
- MessageType.WARN,
- "Use of fail-on-overlap within project.conf " +
- "is deprecated. Consider using fatal-warnings instead."
- )
+ self._context.warn(
+ "Use of fail-on-overlap within project.conf " +
+ "is deprecated. Consider using fatal-warnings instead."
)
# Load project.refs if it exists, this may be ignored.
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 8ce5c062f..864e4588e 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -18,8 +18,6 @@
#
from ruamel import yaml
-from ..._message import Message, MessageType
-
from .job import Job
@@ -86,9 +84,8 @@ class ElementJob(Job):
# This should probably be omitted for non-build tasks but it's harmless here
elt_env = self._element.get_environment()
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self.message(MessageType.LOG,
- "Build environment for element {}".format(self._element.name),
- detail=env_dump)
+ self._log("Build environment for element {}".format(self._element.name),
+ detail=env_dump, plugin=self.element, scheduler=True)
# Run the action
return self._action_cb(self._element)
@@ -96,15 +93,6 @@ class ElementJob(Job):
def parent_complete(self, success, result):
self._complete_cb(self, self._element, success, self._result)
- def message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.message(
- Message(self._element._get_unique_id(),
- message_type,
- message,
- **args))
-
def child_process_data(self):
data = {}
@@ -113,3 +101,10 @@ class ElementJob(Job):
data['workspace'] = workspace.to_dict()
return data
+
+ # _fail()
+ #
+ # Override _fail to set scheduler kwarg to true.
+ #
+ def _fail(self, text, **kwargs):
+ super()._fail(text, scheduler=True, **kwargs)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 60ae0d001..ce5fa4522 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -32,7 +32,7 @@ import psutil
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import MessageType, unconditional_messages
from ... import _signals, utils
# Return code values shutdown of job handling child processes
@@ -110,6 +110,7 @@ class Job():
# Private members
#
self._scheduler = scheduler # The scheduler
+ self._context = scheduler.context # The context, used primarily for UI messaging.
self._queue = None # A message passing queue
self._process = None # The Process object
self._watcher = None # Child process watcher
@@ -184,7 +185,7 @@ class Job():
# First resume the job if it's suspended
self.resume(silent=True)
- self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
+ self._status("{} terminating".format(self.action_name))
# Make sure there is no garbage on the queue
self._parent_stop_listening()
@@ -217,8 +218,8 @@ class Job():
def kill(self):
# Force kill
- self.message(MessageType.WARN,
- "{} did not terminate gracefully, killing".format(self.action_name))
+ self._warn("{} did not terminate gracefully, killing"
+ .format(self.action_name))
try:
utils._kill_process_tree(self._process.pid)
@@ -233,8 +234,7 @@ class Job():
#
def suspend(self):
if not self._suspended:
- self.message(MessageType.STATUS,
- "{} suspending".format(self.action_name))
+ self._status("{} suspending".format(self.action_name))
try:
# Use SIGTSTP so that child processes may handle and propagate
@@ -258,8 +258,7 @@ class Job():
def resume(self, silent=False):
if self._suspended:
if not silent and not self._scheduler.terminated:
- self.message(MessageType.STATUS,
- "{} resuming".format(self.action_name))
+ self._status("{} resuming".format(self.action_name))
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
@@ -324,21 +323,6 @@ class Job():
raise ImplError("Job '{kind}' does not implement child_process()"
.format(kind=type(self).__name__))
- # message():
- #
- # Logs a message, this will be logged in the task's logfile and
- # conditionally also be sent to the frontend.
- #
- # Args:
- # message_type (MessageType): The type of message to send
- # message (str): The message
- # kwargs: Remaining Message() constructor arguments
- #
- def message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.message(Message(None, message_type, message, **args))
-
# child_process_data()
#
# Abstract method to retrieve additional data that should be
@@ -365,6 +349,32 @@ class Job():
#
#######################################################
+ def _debug(self, text, **kwargs):
+ self._context.debug(text, task_id=self._task_id, **kwargs)
+
+ def _status(self, text, **kwargs):
+ self._context.status(text, task_id=self._task_id, **kwargs)
+
+ def _info(self, text, **kwargs):
+ self._context.info(text, task_id=self._task_id, **kwargs)
+
+ def _warn(self, text, **kwargs):
+ self._context.warn(text, task_id=self._task_id, **kwargs)
+
+ def _error(self, text, **kwargs):
+ self._context.error(text, task_id=self._task_id, **kwargs)
+
+ def _log(self, text, **kwargs):
+ self._context.log(text, task_id=self._task_id, **kwargs)
+
+ # _fail()
+ #
+ # Only exists for sub classes to override and add kwargs to.
+ #
+ def _fail(self, text, **kwargs):
+ self._context.message(text, task_id=self._task_id,
+ msg_type=MessageType.FAIL, **kwargs)
+
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -391,7 +401,7 @@ class Job():
# Set the global message handler in this child
# process to forward messages to the parent process
self._queue = queue
- self._scheduler.context.set_message_handler(self._child_message_handler)
+ self._context.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()
stopped_time = None
@@ -408,17 +418,17 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- self._scheduler.context.recorded_messages(self._logfile) as filename:
+ self._context.recorded_messages(self._logfile) as filename:
- self.message(MessageType.START, self.action_name, logfile=filename)
+ self._context.message(self.action_name, logfile=filename,
+ msg_type=MessageType.START, task_id=self._task_id)
try:
# Try the task action
result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
elapsed = datetime.datetime.now() - starttime
- self.message(MessageType.SKIPPED, str(e),
- elapsed=elapsed, logfile=filename)
+ self._context.skipped(e, elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
self._child_shutdown(RC_SKIPPED)
@@ -427,13 +437,11 @@ class Job():
self._retry_flag = e.temporary
if self._retry_flag and (self._tries <= self._max_retries):
- self.message(MessageType.FAIL,
- "Try #{} failed, retrying".format(self._tries),
- elapsed=elapsed, logfile=filename)
+ self._fail("Try #{} failed, retrying".format(self._tries),
+ elapsed=elapsed, logfile=filename)
else:
- self.message(MessageType.FAIL, str(e),
- elapsed=elapsed, detail=e.detail,
- logfile=filename, sandbox=e.sandbox)
+ self._fail(e, elapsed=elapsed, detail=e.detail,
+ logfile=filename, sandbox=e.sandbox)
self._queue.put(Envelope('child_data', self.child_process_data()))
@@ -453,9 +461,9 @@ class Job():
elapsed = datetime.datetime.now() - starttime
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self.message(MessageType.BUG, self.action_name,
- elapsed=elapsed, detail=detail,
- logfile=filename)
+ self._context.message(self.action_name, elapsed=elapsed,
+ detail=detail, logfile=filename,
+ task_id=self._task_id, msg_type=MessageType.BUG)
# Unhandled exceptions should permenantly fail
self._child_shutdown(RC_PERM_FAIL)
@@ -465,8 +473,10 @@ class Job():
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
- logfile=filename)
+ self._context.message(self.action_name,
+ elapsed=elapsed, logfile=filename,
+ msg_type=MessageType.SUCCESS,
+ task_id=self._task_id)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
@@ -603,7 +613,7 @@ class Job():
if envelope._message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope._message)
+ self._context._send_message(envelope._message)
elif envelope._message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 984a5457a..c02e3e57c 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -51,10 +51,10 @@ class BuildQueue(Queue):
self._tried.add(element)
_, description, detail = element._get_build_result()
logfile = element._get_build_log()
- self._message(element, MessageType.FAIL, description,
- detail=detail, action_name=self.action_name,
- elapsed=timedelta(seconds=0),
- logfile=logfile)
+ self._context.message(description, msg_type=MessageType.FAIL, plugin=element,
+ detail=detail, action_name=self.action_name,
+ elapsed=timedelta(seconds=0),
+ logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
resources=self.resources,
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 909cebb44..df51f85c2 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -30,7 +30,7 @@ from ..resources import ResourceType
# BuildStream toplevel imports
from ..._exceptions import BstError, set_last_task_error
-from ..._message import Message, MessageType
+from ..._message import MessageType
# Queue status for a given element
@@ -72,6 +72,7 @@ class Queue():
# Private members
#
self._scheduler = scheduler
+ self._context = scheduler.context
self._wait_queue = deque()
self._done_queue = deque()
self._max_retries = 0
@@ -270,17 +271,19 @@ class Queue():
# Handle any workspace modifications now
#
if workspace_dict:
- context = element._get_context()
- workspaces = context.get_workspaces()
+ workspaces = self._context.get_workspaces()
if workspaces.update_workspace(element._get_full_name(), workspace_dict):
try:
workspaces.save_config()
except BstError as e:
- self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
- except Exception as e: # pylint: disable=broad-except
- self._message(element, MessageType.BUG,
- "Unhandled exception while saving workspaces",
- detail=traceback.format_exc())
+ self._context.error("Error saving workspaces",
+ detail=str(e),
+ plugin=element)
+ except Exception as e: # pylint: disable=broad-except
+ self._context.message("Unhandled exception while saving workspaces",
+ msg_type=MessageType.BUG,
+ detail=traceback.format_exc(),
+ plugin=element)
# _job_done()
#
@@ -304,10 +307,10 @@ class Queue():
try:
self.done(job, element, result, success)
except BstError as e:
-
# Report error and mark as failed
#
- self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
+ self._context.error("Post processing error",
+ plugin=element, detail=traceback.format_exc())
self.failed_elements.append(element)
# Treat this as a task error as it's related to a task
@@ -317,13 +320,12 @@ class Queue():
#
set_last_task_error(e.domain, e.reason)
- except Exception as e: # pylint: disable=broad-except
-
+ except Exception: # pylint: disable=broad-except
# Report unhandled exceptions and mark as failed
#
- self._message(element, MessageType.BUG,
- "Unhandled exception in post processing",
- detail=traceback.format_exc())
+ self._context.message("Unhandled exception in post processing",
+ plugin=element, msg_type=MessageType.BUG,
+ detail=traceback.format_exc())
self.failed_elements.append(element)
else:
#
@@ -343,13 +345,6 @@ class Queue():
else:
self.failed_elements.append(element)
- # Convenience wrapper for Queue implementations to send
- # a message for the element they are processing
- def _message(self, element, message_type, brief, **kwargs):
- context = element._get_context()
- message = Message(element._get_unique_id(), message_type, brief, **kwargs)
- context.message(message)
-
def _element_log_path(self, element):
project = element._get_project()
key = element._get_display_key()[1]
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 6e2e8b25b..67d07664d 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -25,11 +25,11 @@ import stat
import shlex
import shutil
import tarfile
+import traceback
from contextlib import contextmanager
from tempfile import TemporaryDirectory
from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
-from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
from ._pipeline import Pipeline, PipelineSelection
from . import utils, _yaml, _site
@@ -517,7 +517,7 @@ class Stream():
target._open_workspace()
workspaces.save_config()
- self._message(MessageType.INFO, "Saved workspace configuration")
+ self._context.info("Saved workspace configuration")
# workspace_close
#
@@ -544,7 +544,7 @@ class Stream():
# Delete the workspace and save the configuration
workspaces.delete_workspace(element_name)
workspaces.save_config()
- self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
+ self._context.info("Closed workspace for {}".format(element_name))
# workspace_reset
#
@@ -585,8 +585,8 @@ class Stream():
workspace_path = workspace.get_absolute_path()
if soft:
workspace.prepared = False
- self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
- .format(element.name, workspace_path))
+ self._context.info("Reset workspace state for {} at: {}"
+ .format(element.name, workspace.path))
continue
with element.timed_activity("Removing workspace directory {}"
@@ -603,9 +603,8 @@ class Stream():
with element.timed_activity("Staging sources to {}".format(workspace_path)):
element._open_workspace()
- self._message(MessageType.INFO,
- "Reset workspace for {} at: {}".format(element.name,
- workspace_path))
+ self._context.info("Reset workspace for {} at: {}"
+ .format(element.name, workspace._path))
workspaces.save_config()
@@ -681,7 +680,7 @@ class Stream():
# source-bundle only supports one target
target = self.targets[0]
- self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
+ self._context.info("Bundling sources for target {}".format(target.name))
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -961,15 +960,6 @@ class Stream():
return selected, track_selected
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self._context.message(
- Message(None, message_type, message, **args))
-
# _add_queue()
#
# Adds a queue to the stream
@@ -1020,10 +1010,11 @@ class Stream():
for element in self.total_elements:
element._update_state()
except BstError as e:
- self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+ self._context.error("Error resolving final state", detail=e)
set_last_task_error(e.domain, e.reason)
- except Exception as e: # pylint: disable=broad-except
- self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+ except Exception as e: # pylint: disable=broad-except
+ self._context.message("Unhandled exception while resolving final state",
+ detail=traceback.format_exc())
if status == SchedStatus.ERROR:
raise StreamError()
diff --git a/buildstream/plugin.py b/buildstream/plugin.py
index 1b021d4b4..aea135e4e 100644
--- a/buildstream/plugin.py
+++ b/buildstream/plugin.py
@@ -117,7 +117,6 @@ from weakref import WeakValueDictionary
from . import _yaml
from . import utils
from ._exceptions import PluginError, ImplError
-from ._message import Message, MessageType
class Plugin():
@@ -464,8 +463,7 @@ class Plugin():
brief (str): The brief message
detail (str): An optional detailed message, can be multiline output
"""
- if self.__context.log_debug:
- self.__message(MessageType.DEBUG, brief, detail=detail)
+ self.__context.debug(brief, detail=detail, plugin=self)
def status(self, brief, *, detail=None):
"""Print a status message
@@ -474,9 +472,9 @@ class Plugin():
brief (str): The brief message
detail (str): An optional detailed message, can be multiline output
- Note: Status messages tell about what a plugin is currently doing
+ Note: Status messages tell the user what a plugin is currently doing
"""
- self.__message(MessageType.STATUS, brief, detail=detail)
+ self.__context.status(brief, detail=detail, plugin=self)
def info(self, brief, *, detail=None):
"""Print an informative message
@@ -488,7 +486,7 @@ class Plugin():
Note: Informative messages tell the user something they might want
to know, like if refreshing an element caused it to change.
"""
- self.__message(MessageType.INFO, brief, detail=detail)
+ self.__context.info(brief, detail=detail, plugin=self)
def warn(self, brief, *, detail=None, warning_token=None):
"""Print a warning message, checks warning_token against project configuration
@@ -512,7 +510,7 @@ class Plugin():
detail = detail if detail else ""
raise PluginError(message="{}\n{}".format(brief, detail), reason=warning_token)
- self.__message(MessageType.WARN, brief=brief, detail=detail)
+ self.__context.warn(brief, detail=detail, plugin=self)
def log(self, brief, *, detail=None):
"""Log a message into the plugin's log file
@@ -524,7 +522,7 @@ class Plugin():
brief (str): The brief message
detail (str): An optional detailed message, can be multiline output
"""
- self.__message(MessageType.LOG, brief, detail=detail)
+ self.__context.log(brief, detail=detail, plugin=self)
@contextmanager
def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
@@ -746,10 +744,6 @@ class Plugin():
return (exit_code, output)
- def __message(self, message_type, brief, **kwargs):
- message = Message(self.__unique_id, message_type, brief, **kwargs)
- self.__context.message(message)
-
def __note_command(self, output, *popenargs, **kwargs):
workdir = kwargs.get('cwd', os.getcwd())
command = " ".join(popenargs[0])