summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-06-27 17:25:57 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-05 09:44:45 +0000
commitfe632514cb1892da1fe0c0f0c3a4fa3d33a8ec50 (patch)
tree5590573f16cf54b873cc89bdf4a7fb7623b1623c
parent40a2ebe05bf2d4a922ac11f453c79a3d74b196ca (diff)
downloadbuildstream-fe632514cb1892da1fe0c0f0c3a4fa3d33a8ec50.tar.gz
_context:extract _messenger.Messenger from Context
Separate out the responsibility of messaging from the rest of the Context object. This change removes some member variables and some code from member functions, but keeps the existing member functions. In later work we'll want things to use the Messenger directly, so we can remove some member functions too, and de-couple further. Update some comments to refer to the "Messenger API" instead of the "Context API" in the copied code. Fixup the arg documentation to timed_activity() while we're there.
-rw-r--r--src/buildstream/_context.py166
-rw-r--r--src/buildstream/_messenger.py292
2 files changed, 309 insertions, 149 deletions
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index c97acc70e..de1f83d32 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -19,22 +19,18 @@
import os
import shutil
-import datetime
-from collections import deque
from contextlib import contextmanager
from . import utils
from . import _cachekey
-from . import _signals
from . import _site
from . import _yaml
-from ._exceptions import LoadError, LoadErrorReason, BstError
-from ._message import Message, MessageType
+from ._exceptions import LoadError, LoadErrorReason
+from ._messenger import Messenger
from ._profile import Topics, PROFILER
from ._artifactcache import ArtifactCache
from ._sourcecache import SourceCache
from ._cas import CASCache, CASQuota, CASCacheUsage
from ._workspaces import Workspaces, WorkspaceProjectCache
-from .plugin import Plugin
from .sandbox import SandboxRemote
@@ -151,18 +147,15 @@ class Context():
# Private variables
self._cache_key = None
- self._message_handler = None
- self._message_depth = deque()
self._artifactcache = None
self._sourcecache = None
self._projects = []
self._project_overrides = _yaml.new_empty_node()
self._workspaces = None
self._workspace_project_cache = WorkspaceProjectCache()
- self._log_handle = None
- self._log_filename = None
self._cascache = None
self._casquota = None
+ self._messenger = Messenger()
# load()
#
@@ -448,7 +441,7 @@ class Context():
# The message handler should have the same signature as
# the message() method
def set_message_handler(self, handler):
- self._message_handler = handler
+ self._messenger.set_message_handler(handler)
# silent_messages():
#
@@ -456,10 +449,7 @@ class Context():
# (bool): Whether messages are currently being silenced
#
def silent_messages(self):
- for silent in self._message_depth:
- if silent:
- return True
- return False
+ return self._messenger.silent_messages()
# message():
#
@@ -470,20 +460,7 @@ class Context():
# message: A Message object
#
def message(self, message):
-
- # Tag message only once
- if message.depth is None:
- message.depth = len(list(self._message_depth))
-
- # If we are recording messages, dump a copy into the open log file.
- self._record_message(message)
-
- # Send it off to the log handler (can be the frontend,
- # or it can be the child task which will propagate
- # to the frontend)
- assert self._message_handler
-
- self._message_handler(message, context=self)
+ self._messenger.message(message, context=self)
# silence()
#
@@ -494,11 +471,8 @@ class Context():
#
@contextmanager
def silence(self):
- self._push_message_depth(True)
- try:
+ with self._messenger.silence():
yield
- finally:
- self._pop_message_depth()
# timed_activity()
#
@@ -512,41 +486,9 @@ class Context():
#
@contextmanager
def timed_activity(self, activity_name, *, unique_id=None, detail=None, silent_nested=False):
-
- starttime = datetime.datetime.now()
- stopped_time = None
-
- def stop_time():
- nonlocal stopped_time
- stopped_time = datetime.datetime.now()
-
- def resume_time():
- nonlocal stopped_time
- nonlocal starttime
- sleep_time = datetime.datetime.now() - stopped_time
- starttime += sleep_time
-
- with _signals.suspendable(stop_time, resume_time):
- try:
- # Push activity depth for status messages
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
- self.message(message)
- self._push_message_depth(silent_nested)
- yield
-
- except BstError:
- # Note the failure in status messages and reraise, the scheduler
- # expects an error when there is an error.
- elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
- self._pop_message_depth()
- self.message(message)
- raise
-
- elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
- self._pop_message_depth()
- self.message(message)
+ with self._messenger.timed_activity(
+ activity_name, context=self, unique_id=unique_id, detail=detail, silent_nested=silent_nested):
+ yield
# recorded_messages()
#
@@ -569,43 +511,8 @@ class Context():
#
@contextmanager
def recorded_messages(self, filename):
-
- # We dont allow recursing in this context manager, and
- # we also do not allow it in the main process.
- assert self._log_handle is None
- assert self._log_filename is None
- assert not utils._is_main_process()
-
- # Create the fully qualified logfile in the log directory,
- # appending the pid and .log extension at the end.
- self._log_filename = os.path.join(self.logdir,
- '{}.{}.log'.format(filename, os.getpid()))
-
- # Ensure the directory exists first
- directory = os.path.dirname(self._log_filename)
- os.makedirs(directory, exist_ok=True)
-
- with open(self._log_filename, 'a') as logfile:
-
- # Write one last line to the log and flush it to disk
- def flush_log():
-
- # If the process currently had something happening in the I/O stack
- # then trying to reenter the I/O stack will fire a runtime error.
- #
- # So just try to flush as well as we can at SIGTERM time
- try:
- logfile.write('\n\nForcefully terminated\n')
- logfile.flush()
- except RuntimeError:
- os.fsync(logfile.fileno())
-
- self._log_handle = logfile
- with _signals.terminator(flush_log):
- yield self._log_filename
-
- self._log_handle = None
- self._log_filename = None
+ with self._messenger.recorded_messages(filename, logdir=self.logdir) as messages:
+ yield messages
# get_log_handle()
#
@@ -617,7 +524,7 @@ class Context():
# (file): The active logging file handle, or None
#
def get_log_handle(self):
- return self._log_handle
+ return self._messenger.get_log_handle()
# get_log_filename()
#
@@ -629,7 +536,7 @@ class Context():
# (str): The active logging filename, or None
#
def get_log_filename(self):
- return self._log_filename
+ return self._messenger.get_log_filename()
# set_artifact_directories_optional()
#
@@ -658,45 +565,7 @@ class Context():
# message (Message): The message to record
#
def _record_message(self, message):
-
- if self._log_handle is None:
- return
-
- INDENT = " "
- EMPTYTIME = "--:--:--"
- template = "[{timecode: <8}] {type: <7}"
-
- # If this message is associated with a plugin, print what
- # we know about the plugin.
- plugin_name = ""
- if message.unique_id:
- template += " {plugin}"
- plugin = Plugin._lookup(message.unique_id)
- plugin_name = plugin.name
-
- template += ": {message}"
-
- detail = ''
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- text = template.format(timecode=timecode,
- plugin=plugin_name,
- type=message.message_type.upper(),
- message=message.message,
- detail=detail)
-
- # Write to the open log file
- self._log_handle.write('{}\n'.format(text))
- self._log_handle.flush()
+ self._messenger._record_message(message)
# _push_message_depth() / _pop_message_depth()
#
@@ -704,11 +573,10 @@ class Context():
# activities inside a given task through the message
#
def _push_message_depth(self, silent_nested):
- self._message_depth.appendleft(silent_nested)
+ self._messenger._push_message_depth(silent_nested)
def _pop_message_depth(self):
- assert self._message_depth
- self._message_depth.popleft()
+ self._messenger._pop_message_depth()
# Force the resolved XDG variables into the environment,
# this is so that they can be used directly to specify
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
new file mode 100644
index 000000000..6dcb649d7
--- /dev/null
+++ b/src/buildstream/_messenger.py
@@ -0,0 +1,292 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Angelos Evripiotis <jevripiotis@bloomberg.net>
+
+import os
+import datetime
+from collections import deque
+from contextlib import contextmanager
+
+from . import _signals
+from . import utils
+from ._exceptions import BstError
+from ._message import Message, MessageType
+from .plugin import Plugin
+
+
+class Messenger():
+
+ def __init__(self):
+ self._message_handler = None
+ self._message_depth = deque()
+ self._log_handle = None
+ self._log_filename = None
+
+ # set_message_handler()
+ #
+ # Sets the handler for any status messages propagated through
+ # the context.
+ #
+ # The message handler should have the same signature as
+ # the message() method
+ def set_message_handler(self, handler):
+ self._message_handler = handler
+
+ # silent_messages():
+ #
+ # Returns:
+ # (bool): Whether messages are currently being silenced
+ #
+ def silent_messages(self):
+ for silent in self._message_depth:
+ if silent:
+ return True
+ return False
+
+ # message():
+ #
+ # Proxies a message back to the caller, this is the central
+ # point through which all messages pass.
+ #
+ # Args:
+ # message: A Message object
+ #
+ def message(self, message, context):
+
+ # Tag message only once
+ if message.depth is None:
+ message.depth = len(list(self._message_depth))
+
+ # If we are recording messages, dump a copy into the open log file.
+ self._record_message(message)
+
+ # Send it off to the log handler (can be the frontend,
+ # or it can be the child task which will propagate
+ # to the frontend)
+ assert self._message_handler
+
+ self._message_handler(message, context=context)
+
+ # silence()
+ #
+ # A context manager to silence messages, this behaves in
+ # the same way as the `silent_nested` argument of the
+ # timed_activity() context manager: especially
+ # important messages will not be silenced.
+ #
+ @contextmanager
+ def silence(self):
+ self._push_message_depth(True)
+ try:
+ yield
+ finally:
+ self._pop_message_depth()
+
+ # timed_activity()
+ #
+ # Context manager for performing timed activities and logging those
+ #
+ # Args:
+ # activity_name (str): The name of the activity
+ # context (Context): The invocation context object
+ # unique_id (int): Optionally, the unique id of the plugin related to the message
+ # detail (str): An optional detailed message, can be multiline output
+ # silent_nested (bool): If specified, nested messages will be silenced
+ #
+ @contextmanager
+ def timed_activity(self, activity_name, context, *, unique_id=None, detail=None, silent_nested=False):
+
+ starttime = datetime.datetime.now()
+ stopped_time = None
+
+ def stop_time():
+ nonlocal stopped_time
+ stopped_time = datetime.datetime.now()
+
+ def resume_time():
+ nonlocal stopped_time
+ nonlocal starttime
+ sleep_time = datetime.datetime.now() - stopped_time
+ starttime += sleep_time
+
+ with _signals.suspendable(stop_time, resume_time):
+ try:
+ # Push activity depth for status messages
+ message = Message(unique_id, MessageType.START, activity_name, detail=detail)
+ self.message(message, context)
+ self._push_message_depth(silent_nested)
+ yield
+
+ except BstError:
+ # Note the failure in status messages and reraise, the scheduler
+ # expects an error when there is an error.
+ elapsed = datetime.datetime.now() - starttime
+ message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
+ self._pop_message_depth()
+ self.message(message, context)
+ raise
+
+ elapsed = datetime.datetime.now() - starttime
+ message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
+ self._pop_message_depth()
+ self.message(message, context)
+
+ # recorded_messages()
+ #
+ # Records all messages in a log file while the context manager
+ # is active.
+ #
+ # In addition to automatically writing all messages to the
+ # specified logging file, an open file handle for process stdout
+ # and stderr will be available via the Messenger.get_log_handle() API,
+ # and the full logfile path will be available via the
+ # Messenger.get_log_filename() API.
+ #
+ # Args:
+ # filename (str): A logging directory relative filename,
+ # the pid and .log extension will be automatically
+ # appended
+ #
+ # logdir (str) : The path to the log file directory.
+ #
+ # Yields:
+ # (str): The fully qualified log filename
+ #
+ @contextmanager
+ def recorded_messages(self, filename, logdir):
+
+ # We dont allow recursing in this context manager, and
+ # we also do not allow it in the main process.
+ assert self._log_handle is None
+ assert self._log_filename is None
+ assert not utils._is_main_process()
+
+ # Create the fully qualified logfile in the log directory,
+ # appending the pid and .log extension at the end.
+ self._log_filename = os.path.join(logdir,
+ '{}.{}.log'.format(filename, os.getpid()))
+
+ # Ensure the directory exists first
+ directory = os.path.dirname(self._log_filename)
+ os.makedirs(directory, exist_ok=True)
+
+ with open(self._log_filename, 'a') as logfile:
+
+ # Write one last line to the log and flush it to disk
+ def flush_log():
+
+ # If the process currently had something happening in the I/O stack
+ # then trying to reenter the I/O stack will fire a runtime error.
+ #
+ # So just try to flush as well as we can at SIGTERM time
+ try:
+ logfile.write('\n\nForcefully terminated\n')
+ logfile.flush()
+ except RuntimeError:
+ os.fsync(logfile.fileno())
+
+ self._log_handle = logfile
+ with _signals.terminator(flush_log):
+ yield self._log_filename
+
+ self._log_handle = None
+ self._log_filename = None
+
+ # get_log_handle()
+ #
+ # Fetches the active log handle, this will return the active
+ # log file handle when the Messenger.recorded_messages() context
+ # manager is active
+ #
+ # Returns:
+ # (file): The active logging file handle, or None
+ #
+ def get_log_handle(self):
+ return self._log_handle
+
+ # get_log_filename()
+ #
+ # Fetches the active log filename, this will return the active
+ # log filename when the Messenger.recorded_messages() context
+ # manager is active
+ #
+ # Returns:
+ # (str): The active logging filename, or None
+ #
+ def get_log_filename(self):
+ return self._log_filename
+
+ # _record_message()
+ #
+ # Records the message if recording is enabled
+ #
+ # Args:
+ # message (Message): The message to record
+ #
+ def _record_message(self, message):
+
+ if self._log_handle is None:
+ return
+
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+ template = "[{timecode: <8}] {type: <7}"
+
+ # If this message is associated with a plugin, print what
+ # we know about the plugin.
+ plugin_name = ""
+ if message.unique_id:
+ template += " {plugin}"
+ plugin = Plugin._lookup(message.unique_id)
+ plugin_name = plugin.name
+
+ template += ": {message}"
+
+ detail = ''
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip('\n')
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ text = template.format(timecode=timecode,
+ plugin=plugin_name,
+ type=message.message_type.upper(),
+ message=message.message,
+ detail=detail)
+
+ # Write to the open log file
+ self._log_handle.write('{}\n'.format(text))
+ self._log_handle.flush()
+
+ # _push_message_depth() / _pop_message_depth()
+ #
+ # For status messages, send the depth of timed
+ # activities inside a given task through the message
+ #
+ def _push_message_depth(self, silent_nested):
+ self._message_depth.appendleft(silent_nested)
+
+ def _pop_message_depth(self):
+ assert self._message_depth
+ self._message_depth.popleft()