From 9c23ce5c7e16902a2330ee8157e0074c17be821f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 20 Nov 2019 08:17:02 +0100 Subject: job.py: Replace message queue with pipe A lightweight unidirectional pipe is sufficient to pass messages from the child job process to its parent. This also avoids the need to access the private `_reader` instance variable of `multiprocessing.Queue`. --- src/buildstream/_scheduler/jobs/job.py | 84 ++++++++++++++++------------------ 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 61eff88a6..07bbe9c60 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -104,7 +104,7 @@ class _MessageType(FastEnum): # 3. Implement YourJob.create_child_job() and YourJob.parent_complete(). # 4. Implement YourChildJob.child_process(). # -# A Job instance and its ChildJob share a message queue. You may send custom +# A Job instance and its ChildJob share a message pipe. You may send custom # messages to the main process using YourChildJob.send_message(). Such messages # must be processed in YourJob.handle_message(), which you will also need to # override for this purpose. @@ -130,7 +130,7 @@ class Job: # Private members # self._scheduler = scheduler # The scheduler - self._queue = None # A message passing queue + self._pipe_r = None # The read end of a pipe for message passing self._process = None # The Process object self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended @@ -156,7 +156,7 @@ class Job: # def start(self): - self._queue = multiprocessing.Queue() + self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) self._tries += 1 self._parent_start_listening() @@ -174,11 +174,9 @@ class Job: if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),) - self._process = _multiprocessing.AsyncioSafeProcess( - target=do_pickled_child_job, args=[pickled, self._queue], - ) + self._process = _multiprocessing.AsyncioSafeProcess(target=do_pickled_child_job, args=[pickled, pipe_w],) else: - self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[self._queue],) + self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main @@ -189,6 +187,9 @@ class Job: self._process.start() # Register the process to call `_parent_child_completed` once it is done + # Close the write end of the pipe in the parent + pipe_w.close() + # Here we delay the call to the next loop tick. This is in order to be running # in the main thread, as the callback itself must be thread safe. def on_completion(pid, returncode): @@ -209,7 +210,7 @@ class Job: self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) - # Make sure there is no garbage on the queue + # Make sure there is no garbage on the pipe self._parent_stop_listening() # Terminate the process using multiprocessing API pathway @@ -385,11 +386,11 @@ class Job: # _parent_shutdown() # # Shuts down the Job on the parent side by reading any remaining - # messages on the message queue and cleaning up any resources. + # messages on the message pipe and cleaning up any resources. # def _parent_shutdown(self): # Make sure we've read everything we need and then stop listening - self._parent_process_queue() + self._parent_process_pipe() self._parent_stop_listening() # _parent_child_completed() @@ -449,12 +450,14 @@ class Job: self.parent_complete(status, self._result) self._scheduler.job_completed(self, status) - # Force the deletion of the queue and process objects to try and clean up FDs - self._queue = self._process = None + # Force the deletion of the pipe and process objects to try and clean up FDs + self._pipe_r.close() + self._process.close() + self._pipe_r = self._process = None # _parent_process_envelope() # - # Processes a message Envelope deserialized form the message queue. + # Processes a message Envelope deserialized form the message pipe. # # this will have the side effect of assigning some local state # on the Job in the parent process for later inspection when the @@ -487,51 +490,44 @@ class Job: else: assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) - # _parent_process_queue() + # _parent_process_pipe() # - # Reads back message envelopes from the message queue + # Reads back message envelopes from the message pipe # in the parent process. # - def _parent_process_queue(self): - while not self._queue.empty(): - envelope = self._queue.get_nowait() + def _parent_process_pipe(self): + while self._pipe_r.poll(): + try: + envelope = self._pipe_r.recv() + except EOFError: + self._parent_stop_listening() + break self._parent_process_envelope(envelope) # _parent_recv() # # A callback to handle I/O events from the message - # queue file descriptor in the main process message loop + # pipe file descriptor in the main process message loop # def _parent_recv(self, *args): - self._parent_process_queue() + self._parent_process_pipe() # _parent_start_listening() # - # Starts listening on the message queue + # Starts listening on the message pipe # def _parent_start_listening(self): - # Warning: Platform specific code up ahead - # - # The multiprocessing.Queue object does not tell us how - # to receive io events in the receiving process, so we - # need to sneak in and get its file descriptor. - # - # The _reader member of the Queue is currently private - # but well known, perhaps it will become public: - # - # http://bugs.python.org/issue3831 - # if not self._listening: - self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv) + self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv) self._listening = True # _parent_stop_listening() # - # Stops listening on the message queue + # Stops listening on the message pipe # def _parent_stop_listening(self): if self._listening: - self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + self._scheduler.loop.remove_reader(self._pipe_r.fileno()) self._listening = False @@ -573,7 +569,7 @@ class ChildJob: self._message_element_name = message_element_name self._message_element_key = message_element_key - self._queue = None + self._pipe_w = None # The write end of a pipe for message passing # message(): # @@ -658,9 +654,9 @@ class ChildJob: # Perform the action in the child process, this calls the action_cb. # # Args: - # queue (multiprocessing.Queue): The message queue for IPC + # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC # - def child_action(self, queue): + def child_action(self, pipe_w): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -674,11 +670,11 @@ class ChildJob: signal.signal(sig, signal.SIG_DFL) signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) - # Assign the queue we passed across the process boundaries + # Assign the pipe we passed across the process boundaries # # Set the global message handler in this child # process to forward messages to the parent process - self._queue = queue + self._pipe_w = pipe_w self._messenger.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() @@ -782,11 +778,11 @@ class ChildJob: # instances). This is sent to the parent Job. # def _send_message(self, message_type, message_data): - self._queue.put(_Envelope(message_type, message_data)) + self._pipe_w.send(_Envelope(message_type, message_data)) # _child_send_error() # - # Sends an error to the main process through the message queue + # Sends an error to the main process through the message pipe # # Args: # e (Exception): The error to send @@ -803,7 +799,7 @@ class ChildJob: # _child_send_result() # - # Sends the serialized result to the main process through the message queue + # Sends the serialized result to the main process through the message pipe # # Args: # result (any): None, or a simple object (must be pickle-able, i.e. @@ -825,7 +821,7 @@ class ChildJob: # exit_code (_ReturnCode): The exit code to exit with # def _child_shutdown(self, exit_code): - self._queue.close() + self._pipe_w.close() assert isinstance(exit_code, _ReturnCode) sys.exit(exit_code.value) -- cgit v1.2.1