summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 15:04:32 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-18 15:04:32 +0900
commit6a4949c2949ac8680de2fce11946f74ea3749a23 (patch)
treee8a68039ebfdf1e08b6da0841081bc1d9a41fdf6
parent835e5475ddf2cdc3c9196072384d2f58a7df23fb (diff)
downloadbuildstream-6a4949c2949ac8680de2fce11946f74ea3749a23.tar.gz
_scheduler/job.py: Adhere to policy on private symbols
This also removes an illegal peek into element._Element__environment and uses the public Element.get_environment() accessor instead, removing the need to also sanitize the returned dictionary. Further, this reduces the amount of data passed between processes in asyncio callbacks. This is a part of issue #285
-rw-r--r--buildstream/_scheduler/job.py381
1 files changed, 246 insertions, 135 deletions
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py
index c70e2d607..ab8f39d08 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/job.py
@@ -33,7 +33,7 @@ from ruamel import yaml
from .._exceptions import BstError, set_last_task_error
from .._message import Message, MessageType, unconditional_messages
from ..plugin import _plugin_lookup
-from .. import _yaml, _signals, utils
+from .. import _signals, utils
# Used to distinguish between status messages and return values
@@ -96,36 +96,39 @@ class Job():
def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
- # Shared with child process
- self.scheduler = scheduler # The scheduler
- self.queue = multiprocessing.Queue() # A message passing queue
- self.process = None # The Process object
- self.watcher = None # Child process watcher
- self.action_name = action_name # The action name for the Queue
- self.action_cb = action_cb # The action callable function
- self.complete_cb = complete_cb # The complete callable function
- self.element = element # The element we're processing
- self.listening = False # Whether the parent is currently listening
- self.suspended = False # Whether this job is currently suspended
- self.max_retries = max_retries # Maximum number of automatic retries
-
- # Only relevant in parent process after spawning
- self.pid = None # The child's pid in the parent
- self.result = None # Return value of child action in the parent
- self.workspace_dict = None # A serialized Workspace object, after any modifications
- self.tries = 0 # Try count, for retryable jobs
+ #
+ # Public members
+ #
+ self.element = element # The element we're processing
+ self.action_name = action_name # The action name for the Queue
+ self.workspace_dict = None # A serialized Workspace object, after any modifications
+
+ #
+ # Private members
+ #
+ self._scheduler = scheduler # The scheduler
+ self._queue = multiprocessing.Queue() # A message passing queue
+ self._process = None # The Process object
+ self._watcher = None # Child process watcher
+ self._action_cb = action_cb # The action callable function
+ self._complete_cb = complete_cb # The complete callable function
+ self._listening = False # Whether the parent is currently listening
+ self._suspended = False # Whether this job is currently suspended
+ self._max_retries = max_retries # Maximum number of automatic retries
+ self._result = None # Return value of child action in the parent
+ self._tries = 0 # Try count, for retryable jobs
# spawn()
#
+ # Spawns the job.
+ #
def spawn(self):
- self.tries += 1
-
- self.parent_start_listening()
+ self._tries += 1
+ self._parent_start_listening()
# Spawn the process
- self.process = Process(target=self.child_action,
- args=[self.element, self.queue, self.action_name])
+ self._process = Process(target=self._child_action, args=[self._queue])
# Here we want the following
#
@@ -133,33 +136,33 @@ class Job():
# B.) Child should not inherit SIGTSTP handled state
#
with _signals.blocked([signal.SIGINT], ignore=False):
- self.scheduler.loop.remove_signal_handler(signal.SIGTSTP)
- self.process.start()
- self.scheduler.loop.add_signal_handler(signal.SIGTSTP, self.scheduler.suspend_event)
-
- self.pid = self.process.pid
+ self._scheduler.loop.remove_signal_handler(signal.SIGTSTP)
+ self._process.start()
+ self._scheduler.loop.add_signal_handler(signal.SIGTSTP, self._scheduler.suspend_event)
# Wait for it to complete
- self.watcher = asyncio.get_child_watcher()
- self.watcher.add_child_handler(self.pid, self.parent_child_completed, self.element)
+ self._watcher = asyncio.get_child_watcher()
+ self._watcher.add_child_handler(self._process.pid, self._parent_child_completed)
# terminate()
#
- # Forcefully terminates an ongoing job.
+ # Politely request that an ongoing job terminate soon.
+ #
+ # This will send a SIGTERM signal to the Job process.
#
def terminate(self):
# First resume the job if it's suspended
self.resume(silent=True)
- self.message(self.element, MessageType.STATUS,
- "{} terminating".format(self.action_name))
+ self._message(self.element, MessageType.STATUS,
+ "{} terminating".format(self.action_name))
# Make sure there is no garbage on the queue
- self.parent_stop_listening()
+ self._parent_stop_listening()
# Terminate the process using multiprocessing API pathway
- self.process.terminate()
+ self._process.terminate()
# terminate_wait()
#
@@ -170,11 +173,12 @@ class Job():
#
# Returns:
# (bool): True if the process terminated cleanly, otherwise False
+ #
def terminate_wait(self, timeout):
# Join the child process after sending SIGTERM
- self.process.join(timeout)
- return self.process.exitcode is not None
+ self._process.join(timeout)
+ return self._process.exitcode is not None
# kill()
#
@@ -183,30 +187,30 @@ class Job():
def kill(self):
# Force kill
- self.message(self.element, MessageType.WARN,
- "{} did not terminate gracefully, killing".format(self.action_name))
- utils._kill_process_tree(self.process.pid)
+ self._message(self.element, MessageType.WARN,
+ "{} did not terminate gracefully, killing".format(self.action_name))
+ utils._kill_process_tree(self._process.pid)
# suspend()
#
# Suspend this job.
#
def suspend(self):
- if not self.suspended:
- self.message(self.element, MessageType.STATUS,
- "{} suspending".format(self.action_name))
+ if not self._suspended:
+ self._message(self.element, MessageType.STATUS,
+ "{} suspending".format(self.action_name))
try:
# Use SIGTSTP so that child processes may handle and propagate
# it to processes they spawn that become session leaders
- os.kill(self.process.pid, signal.SIGTSTP)
+ os.kill(self._process.pid, signal.SIGTSTP)
# For some reason we receive exactly one suspend event for every
# SIGTSTP we send to the child fork(), even though the child forks
# are setsid(). We keep a count of these so we can ignore them
# in our event loop suspend_event()
- self.scheduler.internal_stops += 1
- self.suspended = True
+ self._scheduler.internal_stops += 1
+ self._suspended = True
except ProcessLookupError:
# ignore, process has already exited
pass
@@ -216,28 +220,55 @@ class Job():
# Resume this suspended job.
#
def resume(self, silent=False):
- if self.suspended:
+ if self._suspended:
if not silent:
- self.message(self.element, MessageType.STATUS,
- "{} resuming".format(self.action_name))
+ self._message(self.element, MessageType.STATUS,
+ "{} resuming".format(self.action_name))
- os.kill(self.process.pid, signal.SIGCONT)
- self.suspended = False
+ os.kill(self._process.pid, signal.SIGCONT)
+ self._suspended = False
- # This can be used equally in the parent and child processes
- def message(self, plugin, message_type, message, **kwargs):
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+ #
+ # Methods prefixed with the word 'child' take place in the child process
+ #
+ # Methods prefixed with the word 'parent' take place in the parent process
+ #
+ # Other methods can be called in both child or parent processes
+ #
+ #######################################################
+
+ # _message():
+ #
+ # Sends a message to the frontend
+ #
+ # Args:
+ # plugin (Plugin): The plugin to send a message for
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments
+ #
+ def _message(self, plugin, message_type, message, **kwargs):
args = dict(kwargs)
args['scheduler'] = True
- self.scheduler.context.message(
+ self._scheduler.context.message(
Message(plugin._get_unique_id(),
message_type,
message,
**args))
- #######################################################
- # Child Process #
- #######################################################
- def child_action(self, element, queue, action_name):
+ # _child_action()
+ #
+ # Perform the action in the child process, this calls the action_cb.
+ #
+ # Args:
+ # queue (multiprocessing.Queue): The message queue for IPC
+ #
+ def _child_action(self, queue):
+
+ element = self.element
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -247,8 +278,8 @@ class Job():
#
# Set the global message handler in this child
# process to forward messages to the parent process
- self.queue = queue
- self.scheduler.context.set_message_handler(self.child_message_handler)
+ self._queue = queue
+ self._scheduler.context.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()
stopped_time = None
@@ -265,40 +296,39 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- element._logging_enabled(action_name) as filename:
+ element._logging_enabled(self.action_name) as filename:
- self.message(element, MessageType.START, self.action_name,
- logfile=filename)
+ self._message(element, MessageType.START, self.action_name, logfile=filename)
# Print the element's environment at the beginning of any element's log file.
#
# This should probably be omitted for non-build tasks but it's harmless here
- elt_env = _yaml.node_sanitize(element._Element__environment)
+ elt_env = element.get_environment()
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self.message(element, MessageType.LOG,
- "Build environment for element {}".format(element.name),
- detail=env_dump, logfile=filename)
+ self._message(element, MessageType.LOG,
+ "Build environment for element {}".format(element.name),
+ detail=env_dump, logfile=filename)
try:
# Try the task action
- result = self.action_cb(element)
+ result = self._action_cb(element)
except BstError as e:
elapsed = datetime.datetime.now() - starttime
- if self.tries <= self.max_retries:
- self.message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self.tries),
- elapsed=elapsed)
+ if self._tries <= self._max_retries:
+ self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
+ elapsed=elapsed)
else:
- self.message(element, MessageType.FAIL, str(e),
- elapsed=elapsed, detail=e.detail,
- logfile=filename, sandbox=e.sandbox)
+ self._message(element, MessageType.FAIL, str(e),
+ elapsed=elapsed, detail=e.detail,
+ logfile=filename, sandbox=e.sandbox)
# Report changes in the workspace, even if there was a handled failure
- self.child_send_workspace(element)
+ self._child_send_workspace()
# Report the exception to the parent (for internal testing purposes)
- self.child_send_error(e)
- self.child_shutdown(1)
+ self._child_send_error(e)
+ self._child_shutdown(1)
except Exception as e: # pylint: disable=broad-except
@@ -308,26 +338,33 @@ class Job():
#
elapsed = datetime.datetime.now() - starttime
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self.message(element, MessageType.BUG, self.action_name,
- elapsed=elapsed, detail=detail,
- logfile=filename)
- self.child_shutdown(1)
+ self._message(element, MessageType.BUG, self.action_name,
+ elapsed=elapsed, detail=detail,
+ logfile=filename)
+ self._child_shutdown(1)
else:
# No exception occurred in the action
- self.child_send_workspace(element)
- self.child_send_result(result)
+ self._child_send_workspace()
+ self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
- self.message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
- logfile=filename)
+ self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+ logfile=filename)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
- self.child_shutdown(0)
+ self._child_shutdown(0)
- def child_send_error(self, e):
+ # _child_send_error()
+ #
+ # Sends an error to the main process through the message queue
+ #
+ # Args:
+ # e (Exception): The error to send
+ #
+ def _child_send_error(self, e):
domain = None
reason = None
@@ -339,24 +376,53 @@ class Job():
'domain': domain,
'reason': reason
})
- self.queue.put(envelope)
+ self._queue.put(envelope)
- def child_send_result(self, result):
+ # _child_send_result()
+ #
+ # Sends the serialized result to the main process through the message queue
+ #
+ # Args:
+ # result (object): A simple serializable object, or None
+ #
+ # Note: If None is passed here, nothing needs to be sent, the
+ # result member in the parent process will simply remain None.
+ #
+ def _child_send_result(self, result):
if result is not None:
envelope = Envelope('result', result)
- self.queue.put(envelope)
+ self._queue.put(envelope)
- def child_send_workspace(self, element):
- workspace = element._get_workspace()
+ # _child_send_workspace()
+ #
+ # Sends the serialized workspace through the message queue, if any
+ #
+ def _child_send_workspace(self):
+ workspace = self.element._get_workspace()
if workspace:
envelope = Envelope('workspace', workspace.to_dict())
- self.queue.put(envelope)
+ self._queue.put(envelope)
- def child_shutdown(self, exit_code):
- self.queue.close()
+ # _child_shutdown()
+ #
+ # Shuts down the child process by cleaning up and exiting the process
+ #
+ # Args:
+ # exit_code (int): The exit code to exit with
+ #
+ def _child_shutdown(self, exit_code):
+ self._queue.close()
sys.exit(exit_code)
- def child_log(self, plugin, message, context):
+ # _child_log()
+ #
+ # Logs a Message to the process's dedicated log file
+ #
+ # Args:
+ # plugin (Plugin): The plugin to log for
+ # message (Message): The message to log
+ #
+ def _child_log(self, plugin, message):
with plugin._output_file() as output:
INDENT = " "
@@ -386,7 +452,18 @@ class Job():
output.write('{}\n'.format(message_text))
output.flush()
- def child_message_handler(self, message, context):
+ # _child_message_handler()
+ #
+ # A Context delegate for handling messages, this replaces the
+ # frontend's main message handler in the context of a child task
+ # and performs local logging to the local log file before sending
+ # the message back to the parent process for further propagation.
+ #
+ # Args:
+ # message (Message): The message to log
+ # context (Context): The context object delegating this message
+ #
+ def _child_message_handler(self, message, context):
# Tag them on the way out the door...
message.action_name = self.action_name
@@ -397,9 +474,9 @@ class Job():
plugin = _plugin_lookup(message.task_id)
# Log first
- self.child_log(plugin, message, context)
+ self._child_log(plugin, message)
- if message.message_type == MessageType.FAIL and self.tries <= self.max_retries:
+ if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
# Job will be retried, display failures as warnings in the frontend
message.message_type = MessageType.WARN
@@ -410,38 +487,54 @@ class Job():
if message.message_type == MessageType.LOG:
return
- self.queue.put(Envelope('message', message))
-
- #######################################################
- # Parent Process #
- #######################################################
+ self._queue.put(Envelope('message', message))
- # shutdown()
+ # _parent_shutdown()
#
- # Should be called after the job completes
+ # Shuts down the Job on the parent side by reading any remaining
+ # messages on the message queue and cleaning up any resources.
#
- def parent_shutdown(self):
+ def _parent_shutdown(self):
# Make sure we've read everything we need and then stop listening
- self.parent_process_queue()
- self.parent_stop_listening()
+ self._parent_process_queue()
+ self._parent_stop_listening()
- def parent_child_completed(self, pid, returncode, element):
- self.parent_shutdown()
+ # _parent_child_completed()
+ #
+ # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
+ #
+ # Args:
+ # pid (int): The PID of the child which completed
+ # returncode (int): The return code of the child process
+ #
+ def _parent_child_completed(self, pid, returncode):
+ self._parent_shutdown()
- if returncode != 0 and self.tries <= self.max_retries:
+ if returncode != 0 and self._tries <= self._max_retries:
self.spawn()
return
- self.complete_cb(self, element, returncode == 0, self.result)
+ self._complete_cb(self, self.element, returncode == 0, self._result)
- def parent_process_envelope(self, envelope):
- if not self.listening:
+ # _parent_process_envelope()
+ #
+ # Processes a message Envelope deserialized form the message queue.
+ #
+ # this will have the side effect of assigning some local state
+ # on the Job in the parent process for later inspection when the
+ # child process completes.
+ #
+ # Args:
+ # envelope (Envelope): The message envelope
+ #
+ def _parent_process_envelope(self, envelope):
+ if not self._listening:
return
if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
- self.scheduler.context.message(envelope.message)
+ self._scheduler.context.message(envelope.message)
elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
@@ -449,22 +542,36 @@ class Job():
set_last_task_error(envelope.message['domain'],
envelope.message['reason'])
elif envelope.message_type == 'result':
- assert self.result is None
- self.result = envelope.message
+ assert self._result is None
+ self._result = envelope.message
elif envelope.message_type == 'workspace':
self.workspace_dict = envelope.message
else:
raise Exception()
- def parent_process_queue(self):
- while not self.queue.empty():
- envelope = self.queue.get_nowait()
- self.parent_process_envelope(envelope)
+ # _parent_process_queue()
+ #
+ # Reads back message envelopes from the message queue
+ # in the parent process.
+ #
+ def _parent_process_queue(self):
+ while not self._queue.empty():
+ envelope = self._queue.get_nowait()
+ self._parent_process_envelope(envelope)
- def parent_recv(self, *args):
- self.parent_process_queue()
+ # _parent_recv()
+ #
+ # A callback to handle I/O events from the message
+ # queue file descriptor in the main process message loop
+ #
+ def _parent_recv(self, *args):
+ self._parent_process_queue()
- def parent_start_listening(self):
+ # _parent_start_listening()
+ #
+ # Starts listening on the message queue
+ #
+ def _parent_start_listening(self):
# Warning: Platform specific code up ahead
#
# The multiprocessing.Queue object does not tell us how
@@ -476,12 +583,16 @@ class Job():
#
# http://bugs.python.org/issue3831
#
- if not self.listening:
- self.scheduler.loop.add_reader(
- self.queue._reader.fileno(), self.parent_recv)
- self.listening = True
-
- def parent_stop_listening(self):
- if self.listening:
- self.scheduler.loop.remove_reader(self.queue._reader.fileno())
- self.listening = False
+ if not self._listening:
+ self._scheduler.loop.add_reader(
+ self._queue._reader.fileno(), self._parent_recv)
+ self._listening = True
+
+ # _parent_stop_listening()
+ #
+ # Stops listening on the message queue
+ #
+ def _parent_stop_listening(self):
+ if self._listening:
+ self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ self._listening = False