summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-08-01 10:15:06 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-16 15:42:35 +0000
commit36457ecd6783d0bf529af6f948ee4de5f600e21f (patch)
tree2b511ec0eb8ea7f41d29298f936fec97c4e9838e /src/buildstream/_scheduler
parentedfb540227f22de4355daf07eb6e2480345883aa (diff)
downloadbuildstream-36457ecd6783d0bf529af6f948ee4de5f600e21f.tar.gz
Abstract mp Queue usage, prep to spawn processes
Pave the way to supporting starting processes by the 'spawn' method, by abstracting our usage of `multiprocessing.Queue`. This means we can easily switch to using a multiprocessing.Manager() and associated queues instead when necessary.
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r--src/buildstream/_scheduler/jobs/job.py39
-rw-r--r--src/buildstream/_scheduler/scheduler.py5
2 files changed, 19 insertions, 25 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 7cbde2d39..4104ddc73 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -136,7 +136,7 @@ class Job():
# Private members
#
self._scheduler = scheduler # The scheduler
- self._queue = None # A message passing queue
+ self._queue_wrapper = None # A wrapper of a message passing queue
self._process = None # The Process object
self._watcher = None # Child process watcher
self._listening = False # Whether the parent is currently listening
@@ -162,8 +162,7 @@ class Job():
# Starts the job.
#
def start(self):
-
- self._queue = multiprocessing.Queue()
+ self._queue_wrapper = self._scheduler.ipc_queue_manager.make_queue_wrapper()
self._tries += 1
self._parent_start_listening()
@@ -478,7 +477,7 @@ class Job():
self._scheduler.job_completed(self, status)
# Force the deletion of the queue and process objects to try and clean up FDs
- self._queue = self._process = None
+ self._queue_wrapper = self._process = None
# _parent_process_envelope()
#
@@ -523,8 +522,8 @@ class Job():
# in the parent process.
#
def _parent_process_queue(self):
- while not self._queue.empty():
- envelope = self._queue.get_nowait()
+ while not self._queue_wrapper.queue.empty():
+ envelope = self._queue_wrapper.queue.get_nowait()
self._parent_process_envelope(envelope)
# _parent_recv()
@@ -540,20 +539,9 @@ class Job():
# Starts listening on the message queue
#
def _parent_start_listening(self):
- # Warning: Platform specific code up ahead
- #
- # The multiprocessing.Queue object does not tell us how
- # to receive io events in the receiving process, so we
- # need to sneak in and get its file descriptor.
- #
- # The _reader member of the Queue is currently private
- # but well known, perhaps it will become public:
- #
- # http://bugs.python.org/issue3831
- #
if not self._listening:
- self._scheduler.loop.add_reader(
- self._queue._reader.fileno(), self._parent_recv)
+ self._queue_wrapper.set_potential_callback_on_queue_event(
+ self._scheduler.loop, self._parent_recv)
self._listening = True
# _parent_stop_listening()
@@ -562,7 +550,8 @@ class Job():
#
def _parent_stop_listening(self):
if self._listening:
- self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ self._queue_wrapper.clear_potential_callback_on_queue_event(
+ self._scheduler.loop)
self._listening = False
@@ -605,7 +594,7 @@ class ChildJob():
self._message_element_name = message_element_name
self._message_element_key = message_element_key
- self._queue = None
+ self._queue_wrapper = None
# message():
#
@@ -692,7 +681,7 @@ class ChildJob():
# Args:
# queue (multiprocessing.Queue): The message queue for IPC
#
- def child_action(self, queue):
+ def child_action(self, queue_wrapper):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -710,7 +699,7 @@ class ChildJob():
#
# Set the global message handler in this child
# process to forward messages to the parent process
- self._queue = queue
+ self._queue_wrapper = queue_wrapper
self._messenger.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()
@@ -808,7 +797,7 @@ class ChildJob():
# instances). This is sent to the parent Job.
#
def _send_message(self, message_type, message_data):
- self._queue.put(_Envelope(message_type, message_data))
+ self._queue_wrapper.queue.put(_Envelope(message_type, message_data))
# _child_send_error()
#
@@ -854,7 +843,7 @@ class ChildJob():
# exit_code (_ReturnCode): The exit code to exit with
#
def _child_shutdown(self, exit_code):
- self._queue.close()
+ self._queue_wrapper.close()
assert isinstance(exit_code, _ReturnCode)
sys.exit(exit_code.value)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d7cf5d09..398e52e74 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -80,6 +80,11 @@ class Scheduler():
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
self.suspended = False # Whether the scheduler is currently suspended
+ # A manager for creating and monitoring IPC queues, note that this
+ # can't be part of the platform or context as it is not always
+ # picklable.
+ self.ipc_queue_manager = self.context.platform.make_queue_manager()
+
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py