summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-07-18 18:15:07 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-07-18 18:59:03 +0900
commitd835c37f2514767b64f6fa5eee0821ec1943a7fd (patch)
tree22d11a25ac1bd6374fa62545395b836c5a9bfd80
parenta950a985e0fa05cf4f7b1b1bbc1de6db338e821c (diff)
downloadbuildstream-tristan/local-cache-expiry.tar.gz
Refactor: Use new logging mechanism from Contexttristan/local-cache-expiry
o element.py / plugin.py: Removed supporting logging code, and derive the log handle from Context. o _scheduler/scheduler.py, _scheduler/queues/queue.py: Adapt to new Job initializer API for the logfile o _scheduler/jobs/job.py: Run job activities within the new context manager which turns on logging for a job. Also removed a lot of custom logging abstract methods which are unneeded. o _scheduler/jobs: Job implementations need not implement so much custom logging.
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py44
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py25
-rw-r--r--buildstream/_scheduler/jobs/elementjob.py75
-rw-r--r--buildstream/_scheduler/jobs/job.py113
-rw-r--r--buildstream/_scheduler/queues/queue.py9
-rw-r--r--buildstream/_scheduler/scheduler.py6
-rw-r--r--buildstream/element.py84
-rw-r--r--buildstream/plugin.py17
8 files changed, 55 insertions, 318 deletions
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index ffb945e43..f73a09c74 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,12 +16,8 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-import os
-from contextlib import contextmanager
-
from .job import Job
from ..._platform import Platform
-from ..._message import Message, MessageType
class CacheSizeJob(Job):
@@ -38,45 +34,5 @@ class CacheSizeJob(Job):
if self._complete_cb:
self._complete_cb(result)
- @contextmanager
- def child_logging_enabled(self, logfile):
- self._logfile = logfile.format(pid=os.getpid())
- yield self._logfile
- self._logfile = None
-
- def message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.message(Message(None, message_type, message, **args))
-
- def child_log(self, message):
- with open(self._logfile, 'a+') as log:
- INDENT = " "
- EMPTYTIME = "--:--:--"
-
- template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
- detail = ''
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- message_text = template.format(timecode=timecode,
- type=message.message_type.upper(),
- name='cache_size',
- message=message.message,
- detail=detail)
-
- log.write('{}\n'.format(message_text))
- log.flush()
-
- return message
-
def child_process_data(self):
return {}
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index af7ca2b0b..bb78e8751 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,12 +16,8 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-import os
-from contextlib import contextmanager
-
from .job import Job
from ..._platform import Platform
-from ..._message import Message
class CleanupJob(Job):
@@ -38,26 +34,5 @@ class CleanupJob(Job):
if self._complete_cb:
self._complete_cb()
- @contextmanager
- def child_logging_enabled(self, logfile):
- self._logfile = logfile.format(pid=os.getpid())
- yield self._logfile
- self._logfile = None
-
- def message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.message(Message(None, message_type, message, **args))
-
- def child_log(self, message):
- message.action_name = self.action_name
-
- with open(self._logfile, 'a+') as log:
- message_text = self.decorate_message(message, '[cleanup]')
- log.write('{}\n'.format(message_text))
- log.flush()
-
- return message
-
def child_process_data(self):
return {}
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 3dfb9aa6e..fcad20ce4 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -16,14 +16,9 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-import os
-from contextlib import contextmanager
-
from ruamel import yaml
from ..._message import Message, MessageType
-from ...plugin import _plugin_lookup
-from ... import _signals
from .job import Job
@@ -77,44 +72,30 @@ class ElementJob(Job):
self._action_cb = action_cb # The action callable function
self._complete_cb = complete_cb # The complete callable function
+ # Set the task wide ID for logging purposes
+ self.set_task_id(element._get_unique_id())
+
@property
def element(self):
return self._element
def child_process(self):
+
+ # Print the element's environment at the beginning of any element's log file.
+ #
+ # This should probably be omitted for non-build tasks but it's harmless here
+ elt_env = self._element.get_environment()
+ env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+ self.message(MessageType.LOG,
+ "Build environment for element {}".format(self._element.name),
+ detail=env_dump)
+
+ # Run the action
return self._action_cb(self._element)
def parent_complete(self, success, result):
self._complete_cb(self, self._element, success, self._result)
- @contextmanager
- def child_logging_enabled(self, logfile):
- self._logfile = logfile.format(pid=os.getpid())
-
- with open(self._logfile, 'a') as log:
- # Write one last line to the log and flush it to disk
- def flush_log():
-
- # If the process currently had something happening in the I/O stack
- # then trying to reenter the I/O stack will fire a runtime error.
- #
- # So just try to flush as well as we can at SIGTERM time
- try:
- # FIXME: Better logging
-
- log.write('\n\nAction {} for element {} forcefully terminated\n'
- .format(self.action_name, self._element.name))
- log.flush()
- except RuntimeError:
- os.fsync(log.fileno())
-
- self._element._set_log_handle(log)
- with _signals.terminator(flush_log):
- self._print_start_message(self._element, self._logfile)
- yield self._logfile
- self._element._set_log_handle(None)
- self._logfile = None
-
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
args['scheduler'] = True
@@ -124,34 +105,6 @@ class ElementJob(Job):
message,
**args))
- def _print_start_message(self, element, logfile):
- self.message(MessageType.START, self.action_name, logfile=logfile)
-
- # Print the element's environment at the beginning of any element's log file.
- #
- # This should probably be omitted for non-build tasks but it's harmless here
- elt_env = element.get_environment()
- env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self.message(MessageType.LOG,
- "Build environment for element {}".format(element.name),
- detail=env_dump, logfile=logfile)
-
- def child_log(self, message):
- # Tag them on the way out the door...
- message.action_name = self.action_name
- message.task_id = self._element._get_unique_id()
-
- # Use the plugin for the task for the output, not a plugin
- # which might be acting on behalf of the task
- plugin = _plugin_lookup(message.task_id)
-
- with plugin._output_file() as output:
- message_text = self.decorate_message(message, '[{}]'.format(plugin.name))
- output.write('{}\n'.format(message_text))
- output.flush()
-
- return message
-
def child_process_data(self):
data = {}
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index dec3a0c11..6d4b685af 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -27,13 +27,12 @@ import datetime
import traceback
import asyncio
import multiprocessing
-from contextlib import contextmanager
import psutil
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error
-from ..._message import MessageType, unconditional_messages
+from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
@@ -113,6 +112,7 @@ class Job():
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
self._logfile = logfile
+ self._task_id = None
# spawn()
#
@@ -247,6 +247,24 @@ class Job():
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
+ # set_task_id()
+ #
+ # This is called by Job subclasses to set a plugin ID
+ # associated with the task at large (if any element is related
+ # to the task).
+ #
+ # The task ID helps keep messages in the frontend coherent
+ # in the case that multiple plugins log in the context of
+ # a single task (e.g. running integration commands should appear
+ # in the frontend for the element being built, not the element
+ # running the integration commands).
+ #
+ # Args:
+ # (int): The plugin identifier for this task
+ #
+ def set_task_id(self, task_id):
+ self._task_id = task_id
+
#######################################################
# Abstract Methods #
#######################################################
@@ -277,28 +295,10 @@ class Job():
raise ImplError("Job '{kind}' does not implement child_process()"
.format(kind=type(self).__name__))
- # child_logging_enabled()
- #
- # Start the log for this job. This function will be given a
- # template string for the path to a log file - this will contain
- # "{pid}", which should be replaced with the current process'
- # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
- #
- # Args:
- # logfile (str): A template string that points to the logfile
- # that should be used - replace {pid} first.
- #
- # Yields:
- # (str) The path to the logfile with {pid} replaced.
- #
- @contextmanager
- def child_logging_enabled(self, logfile):
- raise ImplError("Job '{kind}' does not implement child_logging_enabled()"
- .format(kind=type(self).__name__))
-
# message():
#
- # Sends a message to the frontend
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
#
# Args:
# message_type (MessageType): The type of message to send
@@ -306,8 +306,9 @@ class Job():
# kwargs: Remaining Message() constructor arguments
#
def message(self, message_type, message, **kwargs):
- raise ImplError("Job '{kind}' does not implement message()"
- .format(kind=type(self).__name__))
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(Message(None, message_type, message, **args))
# child_process_data()
#
@@ -323,24 +324,6 @@ class Job():
def child_process_data(self):
return {}
- # child_log()
- #
- # Log a message returned by the frontend's main message handler
- # and return it to the main process.
- #
- # This method is also expected to add process-specific information
- # to the message (notably, action_name and task_id).
- #
- # Arguments:
- # message (str): The message to log
- #
- # Returns:
- # message (Message): A message object
- #
- def child_log(self, message):
- raise ImplError("Job '{kind}' does not implement child_log()"
- .format(kind=type(self).__name__))
-
#######################################################
# Local Private Methods #
#######################################################
@@ -353,42 +336,6 @@ class Job():
#
#######################################################
- # decorate_message()
- #
- # Format a message so that it can be used nicely for logging
- # purposes. This will prepend a time code and add other
- # information to help determine what happened.
- #
- # Args:
- # message (Message) - The message to create a text from.
- # name (str) - A name for the executing context.
- #
- # Returns:
- # (str) The text to log.
- #
- def decorate_message(self, message, name):
- INDENT = " "
- EMPTYTIME = "--:--:--"
- template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
-
- detail = ''
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- return template.format(timecode=timecode,
- type=message.message_type.upper(),
- name=name,
- message=message.message,
- detail=detail)
-
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -398,8 +345,6 @@ class Job():
#
def _child_action(self, queue):
- logfile = self._logfile
-
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
os.setsid()
@@ -434,7 +379,9 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- self.child_logging_enabled(logfile) as filename:
+ self._scheduler.context.recorded_messages(self._logfile) as filename:
+
+ self.message(MessageType.START, self.action_name, logfile=filename)
try:
# Try the task action
@@ -545,8 +492,8 @@ class Job():
#
def _child_message_handler(self, message, context):
- # Log first
- message = self.child_log(message)
+ message.action_name = self.action_name
+ message.task_id = self._task_id
if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
# Job will be retried, display failures as warnings in the frontend
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index ac20d3711..6c1583495 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -353,13 +353,8 @@ class Queue():
def _element_log_path(self, element):
project = element._get_project()
- context = element._get_context()
-
key = element._get_display_key()[1]
action = self.action_name.lower()
- logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
-
- directory = os.path.join(context.logdir, project.name, element.normal_name)
+ logfile = "{key}-{action}".format(key=key, action=action)
- os.makedirs(directory, exist_ok=True)
- return os.path.join(directory, logfile)
+ return os.path.join(project.name, element.normal_name, logfile)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index aeb32931a..5783e5a67 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -317,8 +317,7 @@ class Scheduler():
if cache_size and cache_size < self.context.cache_quota:
return
- logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log')
- job = CleanupJob(self, 'cleanup', logpath,
+ job = CleanupJob(self, 'cleanup', 'cleanup',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE],
@@ -326,8 +325,7 @@ class Scheduler():
self.schedule_jobs([job])
def _check_cache_size_real(self):
- logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')
- job = CacheSizeJob(self, 'cache_size', logpath,
+ job = CacheSizeJob(self, 'cache_size', 'cache_size',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE],
diff --git a/buildstream/element.py b/buildstream/element.py
index 140c824ec..ca06967a2 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -219,7 +219,6 @@ class Element(Plugin):
self.__tracking_scheduled = False # Sources are scheduled to be tracked
self.__tracking_done = False # Sources have been tracked
self.__pull_done = False # Whether pull was attempted
- self.__log_path = None # Path to dedicated log file or None
self.__splits = None # Resolved regex objects for computing split domains
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
@@ -1501,8 +1500,9 @@ class Element(Plugin):
utils.link_files(collectdir, filesdir)
# Copy build log
- if self.__log_path:
- shutil.copyfile(self.__log_path, os.path.join(logsdir, 'build.log'))
+ log_filename = context.get_log_filename()
+ if log_filename:
+ shutil.copyfile(log_filename, os.path.join(logsdir, 'build.log'))
# Store public data
_yaml.dump(_yaml.node_sanitize(self.__dynamic_public), os.path.join(metadir, 'public.yaml'))
@@ -1837,47 +1837,6 @@ class Element(Plugin):
def _subst_string(self, value):
return self.__variables.subst(value)
- # Run some element methods with logging directed to
- # a dedicated log file, here we yield the filename
- # we decided on for logging
- #
- @contextmanager
- def _logging_enabled(self, action_name):
- self.__log_path = self.__logfile(action_name)
- with open(self.__log_path, 'a') as logfile:
-
- # Write one last line to the log and flush it to disk
- def flush_log():
-
- # If the process currently had something happening in the I/O stack
- # then trying to reenter the I/O stack will fire a runtime error.
- #
- # So just try to flush as well as we can at SIGTERM time
- try:
- logfile.write('\n\nAction {} for element {} forcefully terminated\n'
- .format(action_name, self.name))
- logfile.flush()
- except RuntimeError:
- os.fsync(logfile.fileno())
-
- self._set_log_handle(logfile)
- with _signals.terminator(flush_log):
- yield self.__log_path
- self._set_log_handle(None)
- self.__log_path = None
-
- # Override plugin _set_log_handle(), set it for our sources and dependencies too
- #
- # A log handle is set once in the context of a child task which will have only
- # one log, so it's not harmful to modify the state of dependencies
- def _set_log_handle(self, logfile, recurse=True):
- super()._set_log_handle(logfile)
- for source in self.sources():
- source._set_log_handle(logfile)
- if recurse:
- for dep in self.dependencies(Scope.ALL):
- dep._set_log_handle(logfile, False)
-
# Returns the element whose sources this element is ultimately derived from.
#
# This is intended for being used to redirect commands that operate on an
@@ -2015,43 +1974,6 @@ class Element(Plugin):
if workspace:
workspace.prepared = True
- # __logfile()
- #
- # Compose the log file for this action & pid.
- #
- # Args:
- # action_name (str): The action name
- # pid (int): Optional pid, current pid is assumed if not provided.
- #
- # Returns:
- # (string): The log file full path
- #
- # Log file format, when there is a cache key, is:
- #
- # '{logdir}/{project}/{element}/{cachekey}-{action}.{pid}.log'
- #
- # Otherwise, it is:
- #
- # '{logdir}/{project}/{element}/{:0<64}-{action}.{pid}.log'
- #
- # This matches the order in which things are stored in the artifact cache
- #
- def __logfile(self, action_name, pid=None):
- project = self._get_project()
- context = self._get_context()
- key = self.__get_brief_display_key()
- if pid is None:
- pid = os.getpid()
-
- action = action_name.lower()
- logfile = "{key}-{action}.{pid}.log".format(
- key=key, action=action, pid=pid)
-
- directory = os.path.join(context.logdir, project.name, self.normal_name)
-
- os.makedirs(directory, exist_ok=True)
- return os.path.join(directory, logfile)
-
# __assert_cached()
#
# Raises an error if the artifact is not cached.
diff --git a/buildstream/plugin.py b/buildstream/plugin.py
index 29fe2cb11..3e1d89052 100644
--- a/buildstream/plugin.py
+++ b/buildstream/plugin.py
@@ -162,7 +162,6 @@ class Plugin():
self.__provenance = provenance # The Provenance information
self.__type_tag = type_tag # The type of plugin (element or source)
self.__unique_id = _plugin_register(self) # Unique ID
- self.__log = None # The log handle when running a task
# Infer the kind identifier
modulename = type(self).__module__
@@ -474,6 +473,7 @@ class Plugin():
self.call(... command which takes time ...)
"""
with self.__context.timed_activity(activity_name,
+ unique_id=self.__unique_id,
detail=detail,
silent_nested=silent_nested):
yield
@@ -589,27 +589,18 @@ class Plugin():
def _get_provenance(self):
return self.__provenance
- # Accessor for logging handle
- #
- def _get_log_handle(self, log):
- return self.__log
-
- # Mutator for logging handle
- #
- def _set_log_handle(self, log):
- self.__log = log
-
# Context manager for getting the open file handle to this
# plugin's log. Used in the child context to add stuff to
# a log.
#
@contextmanager
def _output_file(self):
- if not self.__log:
+ log = self.__context.get_log_handle()
+ if log is None:
with open(os.devnull, "w") as output:
yield output
else:
- yield self.__log
+ yield log
# _preflight():
# Calls preflight() for the plugin, and allows generic preflight