diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-08-01 10:15:06 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-16 15:42:35 +0000 |
commit | 36457ecd6783d0bf529af6f948ee4de5f600e21f (patch) | |
tree | 2b511ec0eb8ea7f41d29298f936fec97c4e9838e /src/buildstream/_scheduler | |
parent | edfb540227f22de4355daf07eb6e2480345883aa (diff) | |
download | buildstream-36457ecd6783d0bf529af6f948ee4de5f600e21f.tar.gz |
Abstract mp Queue usage, prep to spawn processes
Pave the way to supporting starting processes by the 'spawn' method, by
abstracting our usage of `multiprocessing.Queue`. This means we can
easily switch to using a multiprocessing.Manager() and associated queues
instead when necessary.
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 39 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 5 |
2 files changed, 19 insertions, 25 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 7cbde2d39..4104ddc73 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -136,7 +136,7 @@ class Job(): # Private members # self._scheduler = scheduler # The scheduler - self._queue = None # A message passing queue + self._queue_wrapper = None # A wrapper of a message passing queue self._process = None # The Process object self._watcher = None # Child process watcher self._listening = False # Whether the parent is currently listening @@ -162,8 +162,7 @@ class Job(): # Starts the job. # def start(self): - - self._queue = multiprocessing.Queue() + self._queue_wrapper = self._scheduler.ipc_queue_manager.make_queue_wrapper() self._tries += 1 self._parent_start_listening() @@ -478,7 +477,7 @@ class Job(): self._scheduler.job_completed(self, status) # Force the deletion of the queue and process objects to try and clean up FDs - self._queue = self._process = None + self._queue_wrapper = self._process = None # _parent_process_envelope() # @@ -523,8 +522,8 @@ class Job(): # in the parent process. # def _parent_process_queue(self): - while not self._queue.empty(): - envelope = self._queue.get_nowait() + while not self._queue_wrapper.queue.empty(): + envelope = self._queue_wrapper.queue.get_nowait() self._parent_process_envelope(envelope) # _parent_recv() @@ -540,20 +539,9 @@ class Job(): # Starts listening on the message queue # def _parent_start_listening(self): - # Warning: Platform specific code up ahead - # - # The multiprocessing.Queue object does not tell us how - # to receive io events in the receiving process, so we - # need to sneak in and get its file descriptor. - # - # The _reader member of the Queue is currently private - # but well known, perhaps it will become public: - # - # http://bugs.python.org/issue3831 - # if not self._listening: - self._scheduler.loop.add_reader( - self._queue._reader.fileno(), self._parent_recv) + self._queue_wrapper.set_potential_callback_on_queue_event( + self._scheduler.loop, self._parent_recv) self._listening = True # _parent_stop_listening() @@ -562,7 +550,8 @@ class Job(): # def _parent_stop_listening(self): if self._listening: - self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + self._queue_wrapper.clear_potential_callback_on_queue_event( + self._scheduler.loop) self._listening = False @@ -605,7 +594,7 @@ class ChildJob(): self._message_element_name = message_element_name self._message_element_key = message_element_key - self._queue = None + self._queue_wrapper = None # message(): # @@ -692,7 +681,7 @@ class ChildJob(): # Args: # queue (multiprocessing.Queue): The message queue for IPC # - def child_action(self, queue): + def child_action(self, queue_wrapper): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -710,7 +699,7 @@ class ChildJob(): # # Set the global message handler in this child # process to forward messages to the parent process - self._queue = queue + self._queue_wrapper = queue_wrapper self._messenger.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() @@ -808,7 +797,7 @@ class ChildJob(): # instances). This is sent to the parent Job. # def _send_message(self, message_type, message_data): - self._queue.put(_Envelope(message_type, message_data)) + self._queue_wrapper.queue.put(_Envelope(message_type, message_data)) # _child_send_error() # @@ -854,7 +843,7 @@ class ChildJob(): # exit_code (_ReturnCode): The exit code to exit with # def _child_shutdown(self, exit_code): - self._queue.close() + self._queue_wrapper.close() assert isinstance(exit_code, _ReturnCode) sys.exit(exit_code.value) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 9d7cf5d09..398e52e74 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -80,6 +80,11 @@ class Scheduler(): self.terminated = False # Whether the scheduler was asked to terminate or has terminated self.suspended = False # Whether the scheduler is currently suspended + # A manager for creating and monitoring IPC queues, note that this + # can't be part of the platform or context as it is not always + # picklable. + self.ipc_queue_manager = self.context.platform.make_queue_manager() + # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py |