diff options
author | Benjamin Schubert <contact@benschubert.me> | 2020-07-08 12:02:28 +0000 |
---|---|---|
committer | Benjamin Schubert <contact@benschubert.me> | 2020-07-08 12:02:28 +0000 |
commit | f26ab9fdbbfc297a7ddea7a7b2312573273f4b73 (patch) | |
tree | 64c6a4a2b6315521712524395b0693e6b120d88f | |
parent | 00659deff4a391e93d395ac353a0ecfb36af66a4 (diff) | |
download | buildstream-bschubert/no-multiprocessing-bak.tar.gz |
job.py: Completely remove the need for a queue between parent and child jobsbschubert/no-multiprocessing-bak
We don't need that distinction anymore
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 160 |
1 files changed, 12 insertions, 148 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index b253c1b91..1ce66f68c 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -23,7 +23,6 @@ # System imports import asyncio import datetime -import multiprocessing import traceback # BuildStream toplevel imports @@ -59,17 +58,6 @@ class JobStatus(FastEnum): SKIPPED = 3 -# Used to distinguish between status messages and return values -class _Envelope: - def __init__(self, message_type, message): - self.message_type = message_type - self.message = message - - -class _MessageType(FastEnum): - RESULT = 3 - - # Job() # # The Job object represents a task that will run in parallel to the main @@ -119,9 +107,6 @@ class Job: # self._scheduler = scheduler # The scheduler self._messenger = self._scheduler.context.messenger - self._pipe_r = None # The read end of a pipe for message passing - self._listening = False # Whether the parent is currently listening - self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs @@ -148,11 +133,7 @@ class Job: assert not self._terminated, "Attempted to start process which was already terminated" - # FIXME: remove this, this is not necessary when using asyncio - self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) - self._tries += 1 - self._parent_start_listening() # FIXME: remove the parent/child separation, it's not needed anymore. child_job = self.create_child_job( # pylint: disable=assignment-from-no-return @@ -170,12 +151,13 @@ class Job: async def execute(): try: - result = await loop.run_in_executor(None, child_job.child_action, pipe_w) + ret_code, self._result = await loop.run_in_executor(None, child_job.child_action) except asyncio.CancelledError: - result = _ReturnCode.TERMINATED + ret_code = _ReturnCode.TERMINATED except Exception: # pylint: disable=broad-except - result = _ReturnCode.FAIL - await self._parent_child_completed(result) + traceback.print_exc() + ret_code = _ReturnCode.FAIL + await self._parent_child_completed(ret_code) self._task = loop.create_task(execute()) @@ -188,9 +170,6 @@ class Job: def terminate(self): self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) - # Make sure there is no garbage on the pipe - self._parent_stop_listening() - # Terminate the process using multiprocessing API pathway if self._task: self._task.cancel() @@ -302,16 +281,6 @@ class Job: # Local Private Methods # ####################################################### - # _parent_shutdown() - # - # Shuts down the Job on the parent side by reading any remaining - # messages on the message pipe and cleaning up any resources. - # - def _parent_shutdown(self): - # Make sure we've read everything we need and then stop listening - self._parent_process_pipe() - self._parent_stop_listening() - # _parent_child_completed() # # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() @@ -320,8 +289,6 @@ class Job: # returncode (int): The return code of the child process # async def _parent_child_completed(self, returncode): - self._parent_shutdown() - try: returncode = _ReturnCode(returncode) except ValueError: @@ -368,69 +335,7 @@ class Job: self.parent_complete(status, self._result) self._scheduler.job_completed(self, status) - # Force the deletion of the pipe and process objects to try and clean up FDs - self._pipe_r.close() - self._pipe_r = self._task = None - - # _parent_process_envelope() - # - # Processes a message Envelope deserialized form the message pipe. - # - # this will have the side effect of assigning some local state - # on the Job in the parent process for later inspection when the - # child process completes. - # - # Args: - # envelope (Envelope): The message envelope - # - def _parent_process_envelope(self, envelope): - if not self._listening: - return - elif envelope.message_type is _MessageType.RESULT: - assert self._result is None - self._result = envelope.message - else: - assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) - - # _parent_process_pipe() - # - # Reads back message envelopes from the message pipe - # in the parent process. - # - def _parent_process_pipe(self): - while self._pipe_r.poll(): - try: - envelope = self._pipe_r.recv() - except EOFError: - self._parent_stop_listening() - break - self._parent_process_envelope(envelope) - - # _parent_recv() - # - # A callback to handle I/O events from the message - # pipe file descriptor in the main process message loop - # - def _parent_recv(self, *args): - self._parent_process_pipe() - - # _parent_start_listening() - # - # Starts listening on the message pipe - # - def _parent_start_listening(self): - if not self._listening: - self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv) - self._listening = True - - # _parent_stop_listening() - # - # Stops listening on the message pipe - # - def _parent_stop_listening(self): - if self._listening: - self._scheduler.loop.remove_reader(self._pipe_r.fileno()) - self._listening = False + self._task = None # ChildJob() @@ -471,8 +376,6 @@ class ChildJob: self._message_element_name = message_element_name self._message_element_key = message_element_key - self._pipe_w = None # The write end of a pipe for message passing - # message(): # # Logs a message, this will be logged in the task's logfile and @@ -525,12 +428,7 @@ class ChildJob: # Args: # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC # - def child_action(self, pipe_w): - # Assign the pipe we passed across the process boundaries - # - # Set the global message handler in this child - # process to forward messages to the parent process - self._pipe_w = pipe_w + def child_action(self): self._messenger.set_message_handler(self._child_message_handler) # Time, log and and run the action function @@ -548,7 +446,7 @@ class ChildJob: self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code - return _ReturnCode.SKIPPED + return _ReturnCode.SKIPPED, None except BstError as e: elapsed = datetime.datetime.now() - timeinfo.start_time retry_flag = e.temporary @@ -570,11 +468,11 @@ class ChildJob: # Set return code based on whether or not the error was temporary. # - return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL + return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None except Exception: # pylint: disable=broad-except - # If an unhandled (not normalized to BstError) occurs, that's a bug, + # If an unhandled (not normalized to BstError) occurs, that's a `bug`, # send the traceback and formatted exception back to the frontend # and print it to the log file. # @@ -583,55 +481,21 @@ class ChildJob: self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) # Unhandled exceptions should permenantly fail - return _ReturnCode.PERM_FAIL + return _ReturnCode.PERM_FAIL, None else: - # No exception occurred in the action - self._child_send_result(result) - elapsed = datetime.datetime.now() - timeinfo.start_time self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() - return _ReturnCode.OK - finally: - self._pipe_w.close() + return _ReturnCode.OK, result ####################################################### # Local Private Methods # ####################################################### - # _send_message() - # - # Send data in a message to the parent Job, running in the main process. - # - # Args: - # message_type (str): The type of message to send. - # message_data (any): A simple object (must be pickle-able, i.e. - # strings, lists, dicts, numbers, but not Element - # instances). This is sent to the parent Job. - # - def _send_message(self, message_type, message_data): - self._pipe_w.send(_Envelope(message_type, message_data)) - - # _child_send_result() - # - # Sends the serialized result to the main process through the message pipe - # - # Args: - # result (any): None, or a simple object (must be pickle-able, i.e. - # strings, lists, dicts, numbers, but not Element - # instances). - # - # Note: If None is passed here, nothing needs to be sent, the - # result member in the parent process will simply remain None. - # - def _child_send_result(self, result): - if result is not None: - self._send_message(_MessageType.RESULT, result) - # _child_message_handler() # # A Context delegate for handling messages, this replaces the |