diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-08-20 09:48:36 +0100 |
---|---|---|
committer | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-08-20 10:12:41 +0100 |
commit | 942239bae100a6b1157ed530740dc9172e513266 (patch) | |
tree | 3fa513ed6cb4f0318c80bd56fef5dea68bd69fe4 /src/buildstream/_scheduler | |
parent | 56ff33fbd7c5af1518f27a040da37520b1a3e247 (diff) | |
download | buildstream-942239bae100a6b1157ed530740dc9172e513266.tar.gz |
Remove uneccesary _platform.multiprocessingaevri/nomp
It turns out we don't need to use multiprocessing.Manager() queues when
using the 'spawn' method - the regular multiprocessing queues are also
picklable, if passed as parameters to the new process.
Thanks to @BenjaminSchubert for pointing this out.
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 43 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 5 |
2 files changed, 27 insertions, 21 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 2c4883756..9af08df92 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -168,7 +168,7 @@ class Job(): # Private members # self._scheduler = scheduler # The scheduler - self._queue_wrapper = None # A wrapper of a message passing queue + self._queue = None # A message passing queue self._process = None # The Process object self._watcher = None # Child process watcher self._listening = False # Whether the parent is currently listening @@ -194,7 +194,8 @@ class Job(): # Starts the job. # def start(self): - self._queue_wrapper = self._scheduler.ipc_queue_manager.make_queue_wrapper() + + self._queue = multiprocessing.Queue() self._tries += 1 self._parent_start_listening() @@ -215,12 +216,12 @@ class Job(): child_job, self._scheduler.context.get_projects()) self._process = Process( target=_do_pickled_child_job, - args=[pickled, self._queue_wrapper], + args=[pickled, self._queue], ) else: self._process = Process( target=child_job.child_action, - args=[self._queue_wrapper], + args=[self._queue], ) # Block signals which are handled in the main process such that @@ -515,7 +516,7 @@ class Job(): self._scheduler.job_completed(self, status) # Force the deletion of the queue and process objects to try and clean up FDs - self._queue_wrapper = self._process = None + self._queue = self._process = None # _parent_process_envelope() # @@ -560,8 +561,8 @@ class Job(): # in the parent process. # def _parent_process_queue(self): - while not self._queue_wrapper.queue.empty(): - envelope = self._queue_wrapper.queue.get_nowait() + while not self._queue.empty(): + envelope = self._queue.get_nowait() self._parent_process_envelope(envelope) # _parent_recv() @@ -577,9 +578,20 @@ class Job(): # Starts listening on the message queue # def _parent_start_listening(self): + # Warning: Platform specific code up ahead + # + # The multiprocessing.Queue object does not tell us how + # to receive io events in the receiving process, so we + # need to sneak in and get its file descriptor. + # + # The _reader member of the Queue is currently private + # but well known, perhaps it will become public: + # + # http://bugs.python.org/issue3831 + # if not self._listening: - self._queue_wrapper.set_potential_callback_on_queue_event( - self._scheduler.loop, self._parent_recv) + self._scheduler.loop.add_reader( + self._queue._reader.fileno(), self._parent_recv) self._listening = True # _parent_stop_listening() @@ -588,8 +600,7 @@ class Job(): # def _parent_stop_listening(self): if self._listening: - self._queue_wrapper.clear_potential_callback_on_queue_event( - self._scheduler.loop) + self._scheduler.loop.remove_reader(self._queue._reader.fileno()) self._listening = False @@ -632,7 +643,7 @@ class ChildJob(): self._message_element_name = message_element_name self._message_element_key = message_element_key - self._queue_wrapper = None + self._queue = None # message(): # @@ -719,7 +730,7 @@ class ChildJob(): # Args: # queue (multiprocessing.Queue): The message queue for IPC # - def child_action(self, queue_wrapper): + def child_action(self, queue): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -737,7 +748,7 @@ class ChildJob(): # # Set the global message handler in this child # process to forward messages to the parent process - self._queue_wrapper = queue_wrapper + self._queue = queue self._messenger.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() @@ -835,7 +846,7 @@ class ChildJob(): # instances). This is sent to the parent Job. # def _send_message(self, message_type, message_data): - self._queue_wrapper.queue.put(_Envelope(message_type, message_data)) + self._queue.put(_Envelope(message_type, message_data)) # _child_send_error() # @@ -881,7 +892,7 @@ class ChildJob(): # exit_code (_ReturnCode): The exit code to exit with # def _child_shutdown(self, exit_code): - self._queue_wrapper.close() + self._queue.close() assert isinstance(exit_code, _ReturnCode) sys.exit(exit_code.value) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index bd76e00b1..b29bc8841 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -74,11 +74,6 @@ class Scheduler(): self.terminated = False # Whether the scheduler was asked to terminate or has terminated self.suspended = False # Whether the scheduler is currently suspended - # A manager for creating and monitoring IPC queues, note that this - # can't be part of the platform or context as it is not always - # picklable. - self.ipc_queue_manager = self.context.platform.make_queue_manager() - # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py |