diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 15:04:32 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-18 15:04:32 +0900 |
commit | 6a4949c2949ac8680de2fce11946f74ea3749a23 (patch) | |
tree | e8a68039ebfdf1e08b6da0841081bc1d9a41fdf6 | |
parent | 835e5475ddf2cdc3c9196072384d2f58a7df23fb (diff) | |
download | buildstream-6a4949c2949ac8680de2fce11946f74ea3749a23.tar.gz |
_scheduler/job.py: Adhere to policy on private symbols
This also removes an illegal peek into element._Element__environment
and uses the public Element.get_environment() accessor instead, removing
the need to also sanitize the returned dictionary.
Further, this reduces the amount of data passed between processes
in asyncio callbacks.
This is a part of issue #285
-rw-r--r-- | buildstream/_scheduler/job.py | 381 |
1 files changed, 246 insertions, 135 deletions
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py index c70e2d607..ab8f39d08 100644 --- a/buildstream/_scheduler/job.py +++ b/buildstream/_scheduler/job.py @@ -33,7 +33,7 @@ from ruamel import yaml from .._exceptions import BstError, set_last_task_error from .._message import Message, MessageType, unconditional_messages from ..plugin import _plugin_lookup -from .. import _yaml, _signals, utils +from .. import _signals, utils # Used to distinguish between status messages and return values @@ -96,36 +96,39 @@ class Job(): def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0): - # Shared with child process - self.scheduler = scheduler # The scheduler - self.queue = multiprocessing.Queue() # A message passing queue - self.process = None # The Process object - self.watcher = None # Child process watcher - self.action_name = action_name # The action name for the Queue - self.action_cb = action_cb # The action callable function - self.complete_cb = complete_cb # The complete callable function - self.element = element # The element we're processing - self.listening = False # Whether the parent is currently listening - self.suspended = False # Whether this job is currently suspended - self.max_retries = max_retries # Maximum number of automatic retries - - # Only relevant in parent process after spawning - self.pid = None # The child's pid in the parent - self.result = None # Return value of child action in the parent - self.workspace_dict = None # A serialized Workspace object, after any modifications - self.tries = 0 # Try count, for retryable jobs + # + # Public members + # + self.element = element # The element we're processing + self.action_name = action_name # The action name for the Queue + self.workspace_dict = None # A serialized Workspace object, after any modifications + + # + # Private members + # + self._scheduler = scheduler # The scheduler + self._queue = multiprocessing.Queue() # A message passing queue + self._process = None # The Process object + self._watcher = None # Child process watcher + self._action_cb = action_cb # The action callable function + self._complete_cb = complete_cb # The complete callable function + self._listening = False # Whether the parent is currently listening + self._suspended = False # Whether this job is currently suspended + self._max_retries = max_retries # Maximum number of automatic retries + self._result = None # Return value of child action in the parent + self._tries = 0 # Try count, for retryable jobs # spawn() # + # Spawns the job. + # def spawn(self): - self.tries += 1 - - self.parent_start_listening() + self._tries += 1 + self._parent_start_listening() # Spawn the process - self.process = Process(target=self.child_action, - args=[self.element, self.queue, self.action_name]) + self._process = Process(target=self._child_action, args=[self._queue]) # Here we want the following # @@ -133,33 +136,33 @@ class Job(): # B.) Child should not inherit SIGTSTP handled state # with _signals.blocked([signal.SIGINT], ignore=False): - self.scheduler.loop.remove_signal_handler(signal.SIGTSTP) - self.process.start() - self.scheduler.loop.add_signal_handler(signal.SIGTSTP, self.scheduler.suspend_event) - - self.pid = self.process.pid + self._scheduler.loop.remove_signal_handler(signal.SIGTSTP) + self._process.start() + self._scheduler.loop.add_signal_handler(signal.SIGTSTP, self._scheduler.suspend_event) # Wait for it to complete - self.watcher = asyncio.get_child_watcher() - self.watcher.add_child_handler(self.pid, self.parent_child_completed, self.element) + self._watcher = asyncio.get_child_watcher() + self._watcher.add_child_handler(self._process.pid, self._parent_child_completed) # terminate() # - # Forcefully terminates an ongoing job. + # Politely request that an ongoing job terminate soon. + # + # This will send a SIGTERM signal to the Job process. # def terminate(self): # First resume the job if it's suspended self.resume(silent=True) - self.message(self.element, MessageType.STATUS, - "{} terminating".format(self.action_name)) + self._message(self.element, MessageType.STATUS, + "{} terminating".format(self.action_name)) # Make sure there is no garbage on the queue - self.parent_stop_listening() + self._parent_stop_listening() # Terminate the process using multiprocessing API pathway - self.process.terminate() + self._process.terminate() # terminate_wait() # @@ -170,11 +173,12 @@ class Job(): # # Returns: # (bool): True if the process terminated cleanly, otherwise False + # def terminate_wait(self, timeout): # Join the child process after sending SIGTERM - self.process.join(timeout) - return self.process.exitcode is not None + self._process.join(timeout) + return self._process.exitcode is not None # kill() # @@ -183,30 +187,30 @@ class Job(): def kill(self): # Force kill - self.message(self.element, MessageType.WARN, - "{} did not terminate gracefully, killing".format(self.action_name)) - utils._kill_process_tree(self.process.pid) + self._message(self.element, MessageType.WARN, + "{} did not terminate gracefully, killing".format(self.action_name)) + utils._kill_process_tree(self._process.pid) # suspend() # # Suspend this job. # def suspend(self): - if not self.suspended: - self.message(self.element, MessageType.STATUS, - "{} suspending".format(self.action_name)) + if not self._suspended: + self._message(self.element, MessageType.STATUS, + "{} suspending".format(self.action_name)) try: # Use SIGTSTP so that child processes may handle and propagate # it to processes they spawn that become session leaders - os.kill(self.process.pid, signal.SIGTSTP) + os.kill(self._process.pid, signal.SIGTSTP) # For some reason we receive exactly one suspend event for every # SIGTSTP we send to the child fork(), even though the child forks # are setsid(). We keep a count of these so we can ignore them # in our event loop suspend_event() - self.scheduler.internal_stops += 1 - self.suspended = True + self._scheduler.internal_stops += 1 + self._suspended = True except ProcessLookupError: # ignore, process has already exited pass @@ -216,28 +220,55 @@ class Job(): # Resume this suspended job. # def resume(self, silent=False): - if self.suspended: + if self._suspended: if not silent: - self.message(self.element, MessageType.STATUS, - "{} resuming".format(self.action_name)) + self._message(self.element, MessageType.STATUS, + "{} resuming".format(self.action_name)) - os.kill(self.process.pid, signal.SIGCONT) - self.suspended = False + os.kill(self._process.pid, signal.SIGCONT) + self._suspended = False - # This can be used equally in the parent and child processes - def message(self, plugin, message_type, message, **kwargs): + ####################################################### + # Local Private Methods # + ####################################################### + # + # Methods prefixed with the word 'child' take place in the child process + # + # Methods prefixed with the word 'parent' take place in the parent process + # + # Other methods can be called in both child or parent processes + # + ####################################################### + + # _message(): + # + # Sends a message to the frontend + # + # Args: + # plugin (Plugin): The plugin to send a message for + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments + # + def _message(self, plugin, message_type, message, **kwargs): args = dict(kwargs) args['scheduler'] = True - self.scheduler.context.message( + self._scheduler.context.message( Message(plugin._get_unique_id(), message_type, message, **args)) - ####################################################### - # Child Process # - ####################################################### - def child_action(self, element, queue, action_name): + # _child_action() + # + # Perform the action in the child process, this calls the action_cb. + # + # Args: + # queue (multiprocessing.Queue): The message queue for IPC + # + def _child_action(self, queue): + + element = self.element # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -247,8 +278,8 @@ class Job(): # # Set the global message handler in this child # process to forward messages to the parent process - self.queue = queue - self.scheduler.context.set_message_handler(self.child_message_handler) + self._queue = queue + self._scheduler.context.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() stopped_time = None @@ -265,40 +296,39 @@ class Job(): # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ - element._logging_enabled(action_name) as filename: + element._logging_enabled(self.action_name) as filename: - self.message(element, MessageType.START, self.action_name, - logfile=filename) + self._message(element, MessageType.START, self.action_name, logfile=filename) # Print the element's environment at the beginning of any element's log file. # # This should probably be omitted for non-build tasks but it's harmless here - elt_env = _yaml.node_sanitize(element._Element__environment) + elt_env = element.get_environment() env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) - self.message(element, MessageType.LOG, - "Build environment for element {}".format(element.name), - detail=env_dump, logfile=filename) + self._message(element, MessageType.LOG, + "Build environment for element {}".format(element.name), + detail=env_dump, logfile=filename) try: # Try the task action - result = self.action_cb(element) + result = self._action_cb(element) except BstError as e: elapsed = datetime.datetime.now() - starttime - if self.tries <= self.max_retries: - self.message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self.tries), - elapsed=elapsed) + if self._tries <= self._max_retries: + self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries), + elapsed=elapsed) else: - self.message(element, MessageType.FAIL, str(e), - elapsed=elapsed, detail=e.detail, - logfile=filename, sandbox=e.sandbox) + self._message(element, MessageType.FAIL, str(e), + elapsed=elapsed, detail=e.detail, + logfile=filename, sandbox=e.sandbox) # Report changes in the workspace, even if there was a handled failure - self.child_send_workspace(element) + self._child_send_workspace() # Report the exception to the parent (for internal testing purposes) - self.child_send_error(e) - self.child_shutdown(1) + self._child_send_error(e) + self._child_shutdown(1) except Exception as e: # pylint: disable=broad-except @@ -308,26 +338,33 @@ class Job(): # elapsed = datetime.datetime.now() - starttime detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) - self.message(element, MessageType.BUG, self.action_name, - elapsed=elapsed, detail=detail, - logfile=filename) - self.child_shutdown(1) + self._message(element, MessageType.BUG, self.action_name, + elapsed=elapsed, detail=detail, + logfile=filename) + self._child_shutdown(1) else: # No exception occurred in the action - self.child_send_workspace(element) - self.child_send_result(result) + self._child_send_workspace() + self._child_send_result(result) elapsed = datetime.datetime.now() - starttime - self.message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed, - logfile=filename) + self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed, + logfile=filename) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() - self.child_shutdown(0) + self._child_shutdown(0) - def child_send_error(self, e): + # _child_send_error() + # + # Sends an error to the main process through the message queue + # + # Args: + # e (Exception): The error to send + # + def _child_send_error(self, e): domain = None reason = None @@ -339,24 +376,53 @@ class Job(): 'domain': domain, 'reason': reason }) - self.queue.put(envelope) + self._queue.put(envelope) - def child_send_result(self, result): + # _child_send_result() + # + # Sends the serialized result to the main process through the message queue + # + # Args: + # result (object): A simple serializable object, or None + # + # Note: If None is passed here, nothing needs to be sent, the + # result member in the parent process will simply remain None. + # + def _child_send_result(self, result): if result is not None: envelope = Envelope('result', result) - self.queue.put(envelope) + self._queue.put(envelope) - def child_send_workspace(self, element): - workspace = element._get_workspace() + # _child_send_workspace() + # + # Sends the serialized workspace through the message queue, if any + # + def _child_send_workspace(self): + workspace = self.element._get_workspace() if workspace: envelope = Envelope('workspace', workspace.to_dict()) - self.queue.put(envelope) + self._queue.put(envelope) - def child_shutdown(self, exit_code): - self.queue.close() + # _child_shutdown() + # + # Shuts down the child process by cleaning up and exiting the process + # + # Args: + # exit_code (int): The exit code to exit with + # + def _child_shutdown(self, exit_code): + self._queue.close() sys.exit(exit_code) - def child_log(self, plugin, message, context): + # _child_log() + # + # Logs a Message to the process's dedicated log file + # + # Args: + # plugin (Plugin): The plugin to log for + # message (Message): The message to log + # + def _child_log(self, plugin, message): with plugin._output_file() as output: INDENT = " " @@ -386,7 +452,18 @@ class Job(): output.write('{}\n'.format(message_text)) output.flush() - def child_message_handler(self, message, context): + # _child_message_handler() + # + # A Context delegate for handling messages, this replaces the + # frontend's main message handler in the context of a child task + # and performs local logging to the local log file before sending + # the message back to the parent process for further propagation. + # + # Args: + # message (Message): The message to log + # context (Context): The context object delegating this message + # + def _child_message_handler(self, message, context): # Tag them on the way out the door... message.action_name = self.action_name @@ -397,9 +474,9 @@ class Job(): plugin = _plugin_lookup(message.task_id) # Log first - self.child_log(plugin, message, context) + self._child_log(plugin, message) - if message.message_type == MessageType.FAIL and self.tries <= self.max_retries: + if message.message_type == MessageType.FAIL and self._tries <= self._max_retries: # Job will be retried, display failures as warnings in the frontend message.message_type = MessageType.WARN @@ -410,38 +487,54 @@ class Job(): if message.message_type == MessageType.LOG: return - self.queue.put(Envelope('message', message)) - - ####################################################### - # Parent Process # - ####################################################### + self._queue.put(Envelope('message', message)) - # shutdown() + # _parent_shutdown() # - # Should be called after the job completes + # Shuts down the Job on the parent side by reading any remaining + # messages on the message queue and cleaning up any resources. # - def parent_shutdown(self): + def _parent_shutdown(self): # Make sure we've read everything we need and then stop listening - self.parent_process_queue() - self.parent_stop_listening() + self._parent_process_queue() + self._parent_stop_listening() - def parent_child_completed(self, pid, returncode, element): - self.parent_shutdown() + # _parent_child_completed() + # + # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() + # + # Args: + # pid (int): The PID of the child which completed + # returncode (int): The return code of the child process + # + def _parent_child_completed(self, pid, returncode): + self._parent_shutdown() - if returncode != 0 and self.tries <= self.max_retries: + if returncode != 0 and self._tries <= self._max_retries: self.spawn() return - self.complete_cb(self, element, returncode == 0, self.result) + self._complete_cb(self, self.element, returncode == 0, self._result) - def parent_process_envelope(self, envelope): - if not self.listening: + # _parent_process_envelope() + # + # Processes a message Envelope deserialized form the message queue. + # + # this will have the side effect of assigning some local state + # on the Job in the parent process for later inspection when the + # child process completes. + # + # Args: + # envelope (Envelope): The message envelope + # + def _parent_process_envelope(self, envelope): + if not self._listening: return if envelope.message_type == 'message': # Propagate received messages from children # back through the context. - self.scheduler.context.message(envelope.message) + self._scheduler.context.message(envelope.message) elif envelope.message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state @@ -449,22 +542,36 @@ class Job(): set_last_task_error(envelope.message['domain'], envelope.message['reason']) elif envelope.message_type == 'result': - assert self.result is None - self.result = envelope.message + assert self._result is None + self._result = envelope.message elif envelope.message_type == 'workspace': self.workspace_dict = envelope.message else: raise Exception() - def parent_process_queue(self): - while not self.queue.empty(): - envelope = self.queue.get_nowait() - self.parent_process_envelope(envelope) + # _parent_process_queue() + # + # Reads back message envelopes from the message queue + # in the parent process. + # + def _parent_process_queue(self): + while not self._queue.empty(): + envelope = self._queue.get_nowait() + self._parent_process_envelope(envelope) - def parent_recv(self, *args): - self.parent_process_queue() + # _parent_recv() + # + # A callback to handle I/O events from the message + # queue file descriptor in the main process message loop + # + def _parent_recv(self, *args): + self._parent_process_queue() - def parent_start_listening(self): + # _parent_start_listening() + # + # Starts listening on the message queue + # + def _parent_start_listening(self): # Warning: Platform specific code up ahead # # The multiprocessing.Queue object does not tell us how @@ -476,12 +583,16 @@ class Job(): # # http://bugs.python.org/issue3831 # - if not self.listening: - self.scheduler.loop.add_reader( - self.queue._reader.fileno(), self.parent_recv) - self.listening = True - - def parent_stop_listening(self): - if self.listening: - self.scheduler.loop.remove_reader(self.queue._reader.fileno()) - self.listening = False + if not self._listening: + self._scheduler.loop.add_reader( + self._queue._reader.fileno(), self._parent_recv) + self._listening = True + + # _parent_stop_listening() + # + # Stops listening on the message queue + # + def _parent_stop_listening(self): + if self._listening: + self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + self._listening = False |