summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-08 12:02:28 +0000
committerBenjamin Schubert <contact@benschubert.me>2020-08-30 15:55:41 +0000
commit1893e3aa2d7fd9e79e9f25f05eaa33b98920db2f (patch)
tree3ea9fae86de58ac0ff4ee02a10d2f40764566e9a
parent2ce8239955fefbbf468e86a3b0622e6189da8faa (diff)
downloadbuildstream-bschubert/remove-parent-child-pipe.tar.gz
job.py: Pass the results directly between child and parentbschubert/remove-parent-child-pipe
And cleanup all the queue-messaging related parts that are not required anymore
-rw-r--r--src/buildstream/_scheduler/jobs/job.py162
1 files changed, 9 insertions, 153 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index a80832da0..fed3b58d8 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -23,7 +23,6 @@
# System imports
import asyncio
import datetime
-import multiprocessing
import threading
import traceback
@@ -61,17 +60,6 @@ class JobStatus(FastEnum):
SKIPPED = 3
-# Used to distinguish between status messages and return values
-class _Envelope:
- def __init__(self, message_type, message):
- self.message_type = message_type
- self.message = message
-
-
-class _MessageType(FastEnum):
- RESULT = 1
-
-
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -116,9 +104,6 @@ class Job:
#
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
- self._pipe_r = None # The read end of a pipe for message passing
- self._listening = False # Whether the parent is currently listening
- self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
@@ -146,11 +131,7 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
- # FIXME: remove this, this is not necessary when using asyncio
- self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
self._tries += 1
- self._parent_start_listening()
# FIXME: remove the parent/child separation, it's not needed anymore.
self._child = self.create_child_job( # pylint: disable=assignment-from-no-return
@@ -167,8 +148,8 @@ class Job:
loop = asyncio.get_event_loop()
async def execute():
- result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
- await self._parent_child_completed(result)
+ return_code, self._result = await loop.run_in_executor(None, self._child.child_action)
+ await self._parent_child_completed(return_code)
self._task = loop.create_task(execute())
@@ -181,9 +162,6 @@ class Job:
def terminate(self):
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
- # Make sure there is no garbage on the pipe
- self._parent_stop_listening()
-
if self._task:
self._child.terminate()
@@ -294,16 +272,6 @@ class Job:
# Local Private Methods #
#######################################################
- # _parent_shutdown()
- #
- # Shuts down the Job on the parent side by reading any remaining
- # messages on the message pipe and cleaning up any resources.
- #
- def _parent_shutdown(self):
- # Make sure we've read everything we need and then stop listening
- self._parent_process_pipe()
- self._parent_stop_listening()
-
# _parent_child_completed()
#
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -312,8 +280,6 @@ class Job:
# returncode (int): The return code of the child process
#
async def _parent_child_completed(self, returncode):
- self._parent_shutdown()
-
try:
returncode = _ReturnCode(returncode)
except ValueError:
@@ -353,70 +319,7 @@ class Job:
self.parent_complete(status, self._result)
self._scheduler.job_completed(self, status)
- # Force the deletion of the pipe and process objects to try and clean up FDs
- self._pipe_r.close()
- self._pipe_r = self._task = None
-
- # _parent_process_envelope()
- #
- # Processes a message Envelope deserialized form the message pipe.
- #
- # this will have the side effect of assigning some local state
- # on the Job in the parent process for later inspection when the
- # child process completes.
- #
- # Args:
- # envelope (Envelope): The message envelope
- #
- def _parent_process_envelope(self, envelope):
- if not self._listening:
- return
-
- if envelope.message_type is _MessageType.RESULT:
- assert self._result is None
- self._result = envelope.message
- else:
- assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
-
- # _parent_process_pipe()
- #
- # Reads back message envelopes from the message pipe
- # in the parent process.
- #
- def _parent_process_pipe(self):
- while self._pipe_r.poll():
- try:
- envelope = self._pipe_r.recv()
- except EOFError:
- self._parent_stop_listening()
- break
- self._parent_process_envelope(envelope)
-
- # _parent_recv()
- #
- # A callback to handle I/O events from the message
- # pipe file descriptor in the main process message loop
- #
- def _parent_recv(self, *args):
- self._parent_process_pipe()
-
- # _parent_start_listening()
- #
- # Starts listening on the message pipe
- #
- def _parent_start_listening(self):
- if not self._listening:
- self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
- self._listening = True
-
- # _parent_stop_listening()
- #
- # Stops listening on the message pipe
- #
- def _parent_stop_listening(self):
- if self._listening:
- self._scheduler.loop.remove_reader(self._pipe_r.fileno())
- self._listening = False
+ self._task = None
# ChildJob()
@@ -457,7 +360,6 @@ class ChildJob:
self._message_element_name = message_element_name
self._message_element_key = message_element_key
- self._pipe_w = None # The write end of a pipe for message passing
self._thread_id = None # Thread in which the child executes its action
self._should_terminate = False
self._terminate_lock = threading.Lock()
@@ -511,16 +413,7 @@ class ChildJob:
#
# Perform the action in the child process, this calls the action_cb.
#
- # Args:
- # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
- #
- def child_action(self, pipe_w):
- # Assign the pipe we passed across the process boundaries
- #
- # Set the global message handler in this child
- # process to forward messages to the parent process
- self._pipe_w = pipe_w
-
+ def child_action(self):
# Time, log and and run the action function
#
with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job(
@@ -542,7 +435,7 @@ class ChildJob:
self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
- return _ReturnCode.SKIPPED
+ return _ReturnCode.SKIPPED, None
except BstError as e:
elapsed = datetime.datetime.now() - timeinfo.start_time
retry_flag = e.temporary
@@ -567,7 +460,7 @@ class ChildJob:
# Set return code based on whether or not the error was temporary.
#
- return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+ return (_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL), None
except Exception: # pylint: disable=broad-except
# If an unhandled (not normalized to BstError) occurs, that's a bug,
@@ -579,59 +472,22 @@ class ChildJob:
self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail)
# Unhandled exceptions should permenantly fail
- return _ReturnCode.PERM_FAIL
+ return _ReturnCode.PERM_FAIL, None
else:
# No exception occurred in the action
- self._child_send_result(result)
-
elapsed = datetime.datetime.now() - timeinfo.start_time
self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
- return _ReturnCode.OK
+ return _ReturnCode.OK, result
finally:
self._thread_id = None
except TerminateException:
self._thread_id = None
- return _ReturnCode.TERMINATED
- finally:
- self._pipe_w.close()
-
- #######################################################
- # Local Private Methods #
- #######################################################
-
- # _send_message()
- #
- # Send data in a message to the parent Job, running in the main process.
- #
- # Args:
- # message_type (str): The type of message to send.
- # message_data (any): A simple object (must be pickle-able, i.e.
- # strings, lists, dicts, numbers, but not Element
- # instances). This is sent to the parent Job.
- #
- def _send_message(self, message_type, message_data):
- self._pipe_w.send(_Envelope(message_type, message_data))
-
- # _child_send_result()
- #
- # Sends the serialized result to the main process through the message pipe
- #
- # Args:
- # result (any): None, or a simple object (must be pickle-able, i.e.
- # strings, lists, dicts, numbers, but not Element
- # instances).
- #
- # Note: If None is passed here, nothing needs to be sent, the
- # result member in the parent process will simply remain None.
- #
- def _child_send_result(self, result):
- if result is not None:
- self._send_message(_MessageType.RESULT, result)
+ return _ReturnCode.TERMINATED, None
def terminate(self):
if self._should_terminate: