#!/usr/bin/env python3 # # Copyright (C) 2016 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom # Jürg Billeter # System imports import os import sys import signal import datetime import traceback import asyncio import multiprocessing from ruamel import yaml # BuildStream toplevel imports from .._exceptions import BstError, _set_last_task_error from .._message import Message, MessageType, unconditional_messages from ..plugin import _plugin_lookup from .. import _yaml, _signals, utils # Used to distinguish between status messages and return values class Envelope(): def __init__(self, message_type, message): self.message_type = message_type self.message = message # Process class that doesn't call waitpid on its own. # This prevents conflicts with the asyncio child watcher. class Process(multiprocessing.Process): def start(self): self._popen = self._Popen(self) self._sentinel = self._popen.sentinel # Job() # # Args: # scheduler (Scheduler): The scheduler # element (Element): The element to operate on # action_name (str): The queue action name # class Job(): def __init__(self, scheduler, element, action_name): # Shared with child process self.scheduler = scheduler # The scheduler self.queue = multiprocessing.Queue() # A message passing queue self.process = None # The Process object self.watcher = None # Child process watcher self.action_name = action_name # The action name for the Queue self.action = None # The action callable function self.complete = None # The complete callable function self.element = element # The element we're processing self.listening = False # Whether the parent is currently listening self.suspended = False # Whether this job is currently suspended # Only relevant in parent process after spawning self.pid = None # The child's pid in the parent self.result = None # Return value of child action in the parent self.tries = 0 # spawn() # # Args: # action (callable): The action function # complete (callable): The function to call when complete # max_retries (int): The maximum number of retries # def spawn(self, action, complete, max_retries=0): self.action = action self.complete = complete self.tries += 1 self.max_retries = max_retries self.parent_start_listening() # Spawn the process self.process = Process(target=self.child_action, args=[self.element, self.queue, self.action_name]) # Here we want the following # # A.) Child should inherit blocked SIGINT state, it's never handled there # B.) Child should not inherit SIGTSTP handled state # with _signals.blocked([signal.SIGINT], ignore=False): self.scheduler.loop.remove_signal_handler(signal.SIGTSTP) self.process.start() self.scheduler.loop.add_signal_handler(signal.SIGTSTP, self.scheduler.suspend_event) self.pid = self.process.pid # Wait for it to complete self.watcher = asyncio.get_child_watcher() self.watcher.add_child_handler(self.pid, self.child_complete, self.element) # shutdown() # # Should be called after the job completes # def shutdown(self): # Make sure we've read everything we need and then stop listening self.parent_process_queue() self.parent_stop_listening() # terminate() # # Forcefully terminates an ongoing job. # def terminate(self): # First resume the job if it's suspended self.resume(silent=True) self.message(self.element, MessageType.STATUS, "{} terminating".format(self.action_name)) # Make sure there is no garbage on the queue self.parent_stop_listening() # Terminate the process using multiprocessing API pathway self.process.terminate() # terminate_wait() # # Wait for terminated jobs to complete # # Args: # timeout (float): Seconds to wait # # Returns: # (bool): True if the process terminated cleanly, otherwise False def terminate_wait(self, timeout): # Join the child process after sending SIGTERM self.process.join(timeout) return (self.process.exitcode is not None) # kill() # # Forcefully kill the process, and any children it might have. # def kill(self): # Force kill self.message(self.element, MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) utils._kill_process_tree(self.process.pid) # suspend() # # Suspend this job. # def suspend(self): if not self.suspended: self.message(self.element, MessageType.STATUS, "{} suspending".format(self.action_name)) try: # Use SIGTSTP so that child processes may handle and propagate # it to processes they spawn that become session leaders os.kill(self.process.pid, signal.SIGTSTP) # For some reason we receive exactly one suspend event for every # SIGTSTP we send to the child fork(), even though the child forks # are setsid(). We keep a count of these so we can ignore them # in our event loop suspend_event() self.scheduler.internal_stops += 1 self.suspended = True except ProcessLookupError: # ignore, process has already exited pass # resume() # # Resume this suspended job. # def resume(self, silent=False): if self.suspended: if not silent: self.message(self.element, MessageType.STATUS, "{} resuming".format(self.action_name)) os.kill(self.process.pid, signal.SIGCONT) self.suspended = False # This can be used equally in the parent and child processes def message(self, plugin, message_type, message, **kwargs): args = dict(kwargs) args['scheduler'] = True self.scheduler.context._message( Message(plugin._get_unique_id(), message_type, message, **args)) ####################################################### # Child Process # ####################################################### def child_action(self, element, queue, action_name): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process os.setsid() # Assign the queue we passed across the process boundaries # # Set the global message handler in this child # process to forward messages to the parent process self.queue = queue self.scheduler.context._set_message_handler(self.child_message_handler) starttime = datetime.datetime.now() stopped_time = None def stop_time(): nonlocal stopped_time stopped_time = datetime.datetime.now() def resume_time(): nonlocal stopped_time nonlocal starttime starttime += (datetime.datetime.now() - stopped_time) # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ element._logging_enabled(action_name) as filename: self.message(element, MessageType.START, self.action_name, logfile=filename) # Print the element's environment at the beginning of any element's log file. # # This should probably be omitted for non-build tasks but it's harmless here elt_env = _yaml.node_sanitize(element._Element__environment) env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) self.message(element, MessageType.LOG, "Build environment for element {}".format(element.name), detail=env_dump, logfile=filename) try: result = self.action(element) if result is not None: envelope = Envelope('result', result) self.queue.put(envelope) except BstError as e: elapsed = datetime.datetime.now() - starttime if self.tries <= self.max_retries: self.message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self.tries), elapsed=elapsed) else: self.message(element, MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) # Report the exception to the parent (for internal testing purposes) self.child_send_error(e) self.child_shutdown(1) except Exception as e: # If an unhandled (not normalized to BstError) occurs, that's a bug, # send the traceback and formatted exception back to the frontend # and print it to the log file. # elapsed = datetime.datetime.now() - starttime detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) self.message(element, MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) self.child_shutdown(1) elapsed = datetime.datetime.now() - starttime self.message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() self.child_shutdown(0) def child_send_error(self, e): domain = None reason = None if isinstance(e, BstError): domain = e.domain reason = e.reason envelope = Envelope('error', { 'domain': domain, 'reason': reason }) self.queue.put(envelope) def child_complete(self, pid, returncode, element): if returncode != 0 and self.tries <= self.max_retries: self.shutdown() self.spawn(self.action, self.complete, self.max_retries) return self.complete(self, returncode, element) def child_shutdown(self, exit_code): self.queue.close() sys.exit(exit_code) def child_log(self, plugin, message, context): with plugin._output_file() as output: INDENT = " " EMPTYTIME = "--:--:--" name = '[' + plugin.name + ']' fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}" detail = '' if message.detail is not None: fmt += "\n\n{detail}" detail = message.detail.rstrip('\n') detail = INDENT + INDENT.join(detail.splitlines(True)) timecode = EMPTYTIME if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60) minutes, seconds = divmod(remainder, 60) timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) message_text = fmt.format(timecode=timecode, type=message.message_type.upper(), name=name, message=message.message, detail=detail) output.write('{}\n'.format(message_text)) output.flush() def child_message_handler(self, message, context): # Tag them on the way out the door... message.action_name = self.action_name message.task_id = self.element._get_unique_id() # Use the plugin for the task for the output, not a plugin # which might be acting on behalf of the task plugin = _plugin_lookup(message.task_id) # Log first self.child_log(plugin, message, context) if message.message_type == MessageType.FAIL and self.tries <= self.max_retries: # Job will be retried, display failures as warnings in the frontend message.message_type = MessageType.WARN # Send to frontend if appropriate if (context._silent_messages() and message.message_type not in unconditional_messages): return if message.message_type == MessageType.LOG: return self.queue.put(Envelope('message', message)) ####################################################### # Parent Process # ####################################################### def parent_process_envelope(self, envelope): if not self.listening: return if envelope.message_type == 'message': # Propagate received messages from children # back through the context. self.scheduler.context._message(envelope.message) elif envelope.message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py _set_last_task_error(envelope.message['domain'], envelope.message['reason']) elif envelope.message_type == 'result': assert(self.result is None) self.result = envelope.message else: raise Exception() def parent_process_queue(self): while not self.queue.empty(): envelope = self.queue.get_nowait() self.parent_process_envelope(envelope) def parent_recv(self, *args): self.parent_process_queue() def parent_start_listening(self): # Warning: Platform specific code up ahead # # The multiprocessing.Queue object does not tell us how # to receive io events in the receiving process, so we # need to sneak in and get its file descriptor. # # The _reader member of the Queue is currently private # but well known, perhaps it will become public: # # http://bugs.python.org/issue3831 # if not self.listening: self.scheduler.loop.add_reader( self.queue._reader.fileno(), self.parent_recv) self.listening = True def parent_stop_listening(self): if self.listening: self.scheduler.loop.remove_reader(self.queue._reader.fileno()) self.listening = False