summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-11-20 08:17:02 +0100
committerJürg Billeter <j@bitron.ch>2019-12-12 20:05:00 +0100
commit9c23ce5c7e16902a2330ee8157e0074c17be821f (patch)
tree22885dc134c8cd88585f02cad29a405df22c807a
parent0b3612ad188ae0f605acd8f5339fd5f42aeb3948 (diff)
downloadbuildstream-juerg/job-pipe.tar.gz
job.py: Replace message queue with pipejuerg/job-pipe
A lightweight unidirectional pipe is sufficient to pass messages from the child job process to its parent. This also avoids the need to access the private `_reader` instance variable of `multiprocessing.Queue`.
-rw-r--r--src/buildstream/_scheduler/jobs/job.py84
1 files changed, 40 insertions, 44 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 61eff88a6..07bbe9c60 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -104,7 +104,7 @@ class _MessageType(FastEnum):
# 3. Implement YourJob.create_child_job() and YourJob.parent_complete().
# 4. Implement YourChildJob.child_process().
#
-# A Job instance and its ChildJob share a message queue. You may send custom
+# A Job instance and its ChildJob share a message pipe. You may send custom
# messages to the main process using YourChildJob.send_message(). Such messages
# must be processed in YourJob.handle_message(), which you will also need to
# override for this purpose.
@@ -130,7 +130,7 @@ class Job:
# Private members
#
self._scheduler = scheduler # The scheduler
- self._queue = None # A message passing queue
+ self._pipe_r = None # The read end of a pipe for message passing
self._process = None # The Process object
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
@@ -156,7 +156,7 @@ class Job:
#
def start(self):
- self._queue = multiprocessing.Queue()
+ self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
self._tries += 1
self._parent_start_listening()
@@ -174,11 +174,9 @@ class Job:
if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),)
- self._process = _multiprocessing.AsyncioSafeProcess(
- target=do_pickled_child_job, args=[pickled, self._queue],
- )
+ self._process = _multiprocessing.AsyncioSafeProcess(target=do_pickled_child_job, args=[pickled, pipe_w],)
else:
- self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[self._queue],)
+ self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],)
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -189,6 +187,9 @@ class Job:
self._process.start()
# Register the process to call `_parent_child_completed` once it is done
+ # Close the write end of the pipe in the parent
+ pipe_w.close()
+
# Here we delay the call to the next loop tick. This is in order to be running
# in the main thread, as the callback itself must be thread safe.
def on_completion(pid, returncode):
@@ -209,7 +210,7 @@ class Job:
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
- # Make sure there is no garbage on the queue
+ # Make sure there is no garbage on the pipe
self._parent_stop_listening()
# Terminate the process using multiprocessing API pathway
@@ -385,11 +386,11 @@ class Job:
# _parent_shutdown()
#
# Shuts down the Job on the parent side by reading any remaining
- # messages on the message queue and cleaning up any resources.
+ # messages on the message pipe and cleaning up any resources.
#
def _parent_shutdown(self):
# Make sure we've read everything we need and then stop listening
- self._parent_process_queue()
+ self._parent_process_pipe()
self._parent_stop_listening()
# _parent_child_completed()
@@ -449,12 +450,14 @@ class Job:
self.parent_complete(status, self._result)
self._scheduler.job_completed(self, status)
- # Force the deletion of the queue and process objects to try and clean up FDs
- self._queue = self._process = None
+ # Force the deletion of the pipe and process objects to try and clean up FDs
+ self._pipe_r.close()
+ self._process.close()
+ self._pipe_r = self._process = None
# _parent_process_envelope()
#
- # Processes a message Envelope deserialized form the message queue.
+ # Processes a message Envelope deserialized form the message pipe.
#
# this will have the side effect of assigning some local state
# on the Job in the parent process for later inspection when the
@@ -487,51 +490,44 @@ class Job:
else:
assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
- # _parent_process_queue()
+ # _parent_process_pipe()
#
- # Reads back message envelopes from the message queue
+ # Reads back message envelopes from the message pipe
# in the parent process.
#
- def _parent_process_queue(self):
- while not self._queue.empty():
- envelope = self._queue.get_nowait()
+ def _parent_process_pipe(self):
+ while self._pipe_r.poll():
+ try:
+ envelope = self._pipe_r.recv()
+ except EOFError:
+ self._parent_stop_listening()
+ break
self._parent_process_envelope(envelope)
# _parent_recv()
#
# A callback to handle I/O events from the message
- # queue file descriptor in the main process message loop
+ # pipe file descriptor in the main process message loop
#
def _parent_recv(self, *args):
- self._parent_process_queue()
+ self._parent_process_pipe()
# _parent_start_listening()
#
- # Starts listening on the message queue
+ # Starts listening on the message pipe
#
def _parent_start_listening(self):
- # Warning: Platform specific code up ahead
- #
- # The multiprocessing.Queue object does not tell us how
- # to receive io events in the receiving process, so we
- # need to sneak in and get its file descriptor.
- #
- # The _reader member of the Queue is currently private
- # but well known, perhaps it will become public:
- #
- # http://bugs.python.org/issue3831
- #
if not self._listening:
- self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv)
+ self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
self._listening = True
# _parent_stop_listening()
#
- # Stops listening on the message queue
+ # Stops listening on the message pipe
#
def _parent_stop_listening(self):
if self._listening:
- self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ self._scheduler.loop.remove_reader(self._pipe_r.fileno())
self._listening = False
@@ -573,7 +569,7 @@ class ChildJob:
self._message_element_name = message_element_name
self._message_element_key = message_element_key
- self._queue = None
+ self._pipe_w = None # The write end of a pipe for message passing
# message():
#
@@ -658,9 +654,9 @@ class ChildJob:
# Perform the action in the child process, this calls the action_cb.
#
# Args:
- # queue (multiprocessing.Queue): The message queue for IPC
+ # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
#
- def child_action(self, queue):
+ def child_action(self, pipe_w):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -674,11 +670,11 @@ class ChildJob:
signal.signal(sig, signal.SIG_DFL)
signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
- # Assign the queue we passed across the process boundaries
+ # Assign the pipe we passed across the process boundaries
#
# Set the global message handler in this child
# process to forward messages to the parent process
- self._queue = queue
+ self._pipe_w = pipe_w
self._messenger.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()
@@ -782,11 +778,11 @@ class ChildJob:
# instances). This is sent to the parent Job.
#
def _send_message(self, message_type, message_data):
- self._queue.put(_Envelope(message_type, message_data))
+ self._pipe_w.send(_Envelope(message_type, message_data))
# _child_send_error()
#
- # Sends an error to the main process through the message queue
+ # Sends an error to the main process through the message pipe
#
# Args:
# e (Exception): The error to send
@@ -803,7 +799,7 @@ class ChildJob:
# _child_send_result()
#
- # Sends the serialized result to the main process through the message queue
+ # Sends the serialized result to the main process through the message pipe
#
# Args:
# result (any): None, or a simple object (must be pickle-able, i.e.
@@ -825,7 +821,7 @@ class ChildJob:
# exit_code (_ReturnCode): The exit code to exit with
#
def _child_shutdown(self, exit_code):
- self._queue.close()
+ self._pipe_w.close()
assert isinstance(exit_code, _ReturnCode)
sys.exit(exit_code.value)