summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-08-20 09:48:36 +0100
committerAngelos Evripiotis <jevripiotis@bloomberg.net>2019-08-20 10:12:41 +0100
commit942239bae100a6b1157ed530740dc9172e513266 (patch)
tree3fa513ed6cb4f0318c80bd56fef5dea68bd69fe4
parent56ff33fbd7c5af1518f27a040da37520b1a3e247 (diff)
downloadbuildstream-aevri/nomp.tar.gz
Remove uneccesary _platform.multiprocessingaevri/nomp
It turns out we don't need to use multiprocessing.Manager() queues when using the 'spawn' method - the regular multiprocessing queues are also picklable, if passed as parameters to the new process. Thanks to @BenjaminSchubert for pointing this out.
-rw-r--r--src/buildstream/_platform/multiprocessing.py108
-rw-r--r--src/buildstream/_platform/platform.py8
-rw-r--r--src/buildstream/_scheduler/jobs/job.py43
-rw-r--r--src/buildstream/_scheduler/scheduler.py5
4 files changed, 27 insertions, 137 deletions
diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py
deleted file mode 100644
index c036651e7..000000000
--- a/src/buildstream/_platform/multiprocessing.py
+++ /dev/null
@@ -1,108 +0,0 @@
-#
-# Copyright (C) 2019 Bloomberg Finance LP
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-
-import multiprocessing
-
-
-# QueueManager()
-#
-# This abstracts our choice of creating picklable or non-picklable Queues.
-#
-# Note that when choosing the 'spawn' or 'forkserver' methods of starting
-# processes with the `multiprocessing` standard library module, we must use
-# only picklable type as parameters to jobs.
-#
-class QueueManager:
- def make_queue_wrapper(self):
- return _PlainQueueWrapper(multiprocessing.Queue())
-
-
-# PicklableQueueManager()
-#
-# A QueueManager that creates pickable types.
-#
-# Note that the requirement of being picklable adds extra runtime burden, as we
-# must create and maintain a `SyncManager` process that will create and manage
-# the real objects.
-#
-class PicklableQueueManager(QueueManager):
- def __init__(self):
- super().__init__()
- self._manager = None
-
- def make_queue_wrapper(self):
- # Only SyncManager Queues are picklable, so we must make those. Avoid
- # creating multiple expensive SyncManagers, by keeping this one around.
- if self._manager is None:
- self._manager = multiprocessing.Manager()
- return _SyncManagerQueueWrapper(self._manager.Queue())
-
-
-# QueueWrapper()
-#
-# This abstracts our choice of using picklable or non-picklable Queues.
-#
-class QueueWrapper:
- pass
-
-
-class _PlainQueueWrapper(QueueWrapper):
- def __init__(self, queue):
- super().__init__()
- self.queue = queue
-
- def set_potential_callback_on_queue_event(self, event_loop, callback):
- # Warning: Platform specific code up ahead
- #
- # The multiprocessing.Queue object does not tell us how
- # to receive io events in the receiving process, so we
- # need to sneak in and get its file descriptor.
- #
- # The _reader member of the Queue is currently private
- # but well known, perhaps it will become public:
- #
- # http://bugs.python.org/issue3831
- #
- event_loop.add_reader(self.queue._reader.fileno(), callback)
-
- def clear_potential_callback_on_queue_event(self, event_loop):
- event_loop.remove_reader(self.queue._reader.fileno())
-
- def close(self):
- self.queue.close()
-
-
-class _SyncManagerQueueWrapper(QueueWrapper):
- def __init__(self, queue):
- super().__init__()
- self.queue = queue
-
- def set_potential_callback_on_queue_event(self, event_loop, callback):
- # We can't easily support these callbacks for Queues managed by a
- # SyncManager, so don't support them for now. In later work we should
- # be able to support them with threading.
- pass
-
- def clear_potential_callback_on_queue_event(self, event_loop):
- pass
-
- def close(self):
- # SyncManager queue proxies do not have a `close()` method, they rely
- # on a callback on garbage collection to release resources. For our
- # purposes the queue is invalid after closing, so it's ok to release it
- # here.
- self.queue = None
diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py
index faf3d3c52..11c9217be 100644
--- a/src/buildstream/_platform/platform.py
+++ b/src/buildstream/_platform/platform.py
@@ -28,8 +28,6 @@ import psutil
from .._exceptions import PlatformError, ImplError, SandboxError
from .. import utils
-from .multiprocessing import QueueManager, PicklableQueueManager
-
class Platform():
# Platform()
@@ -175,12 +173,6 @@ class Platform():
uname_machine = platform.uname().machine
return Platform.canonicalize_arch(uname_machine)
- def make_queue_manager(self):
- if self.does_multiprocessing_start_require_pickling():
- return PicklableQueueManager()
- else:
- return QueueManager()
-
# does_multiprocessing_start_require_pickling():
#
# Returns True if the multiprocessing start method will pickle arguments
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 2c4883756..9af08df92 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -168,7 +168,7 @@ class Job():
# Private members
#
self._scheduler = scheduler # The scheduler
- self._queue_wrapper = None # A wrapper of a message passing queue
+ self._queue = None # A message passing queue
self._process = None # The Process object
self._watcher = None # Child process watcher
self._listening = False # Whether the parent is currently listening
@@ -194,7 +194,8 @@ class Job():
# Starts the job.
#
def start(self):
- self._queue_wrapper = self._scheduler.ipc_queue_manager.make_queue_wrapper()
+
+ self._queue = multiprocessing.Queue()
self._tries += 1
self._parent_start_listening()
@@ -215,12 +216,12 @@ class Job():
child_job, self._scheduler.context.get_projects())
self._process = Process(
target=_do_pickled_child_job,
- args=[pickled, self._queue_wrapper],
+ args=[pickled, self._queue],
)
else:
self._process = Process(
target=child_job.child_action,
- args=[self._queue_wrapper],
+ args=[self._queue],
)
# Block signals which are handled in the main process such that
@@ -515,7 +516,7 @@ class Job():
self._scheduler.job_completed(self, status)
# Force the deletion of the queue and process objects to try and clean up FDs
- self._queue_wrapper = self._process = None
+ self._queue = self._process = None
# _parent_process_envelope()
#
@@ -560,8 +561,8 @@ class Job():
# in the parent process.
#
def _parent_process_queue(self):
- while not self._queue_wrapper.queue.empty():
- envelope = self._queue_wrapper.queue.get_nowait()
+ while not self._queue.empty():
+ envelope = self._queue.get_nowait()
self._parent_process_envelope(envelope)
# _parent_recv()
@@ -577,9 +578,20 @@ class Job():
# Starts listening on the message queue
#
def _parent_start_listening(self):
+ # Warning: Platform specific code up ahead
+ #
+ # The multiprocessing.Queue object does not tell us how
+ # to receive io events in the receiving process, so we
+ # need to sneak in and get its file descriptor.
+ #
+ # The _reader member of the Queue is currently private
+ # but well known, perhaps it will become public:
+ #
+ # http://bugs.python.org/issue3831
+ #
if not self._listening:
- self._queue_wrapper.set_potential_callback_on_queue_event(
- self._scheduler.loop, self._parent_recv)
+ self._scheduler.loop.add_reader(
+ self._queue._reader.fileno(), self._parent_recv)
self._listening = True
# _parent_stop_listening()
@@ -588,8 +600,7 @@ class Job():
#
def _parent_stop_listening(self):
if self._listening:
- self._queue_wrapper.clear_potential_callback_on_queue_event(
- self._scheduler.loop)
+ self._scheduler.loop.remove_reader(self._queue._reader.fileno())
self._listening = False
@@ -632,7 +643,7 @@ class ChildJob():
self._message_element_name = message_element_name
self._message_element_key = message_element_key
- self._queue_wrapper = None
+ self._queue = None
# message():
#
@@ -719,7 +730,7 @@ class ChildJob():
# Args:
# queue (multiprocessing.Queue): The message queue for IPC
#
- def child_action(self, queue_wrapper):
+ def child_action(self, queue):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -737,7 +748,7 @@ class ChildJob():
#
# Set the global message handler in this child
# process to forward messages to the parent process
- self._queue_wrapper = queue_wrapper
+ self._queue = queue
self._messenger.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()
@@ -835,7 +846,7 @@ class ChildJob():
# instances). This is sent to the parent Job.
#
def _send_message(self, message_type, message_data):
- self._queue_wrapper.queue.put(_Envelope(message_type, message_data))
+ self._queue.put(_Envelope(message_type, message_data))
# _child_send_error()
#
@@ -881,7 +892,7 @@ class ChildJob():
# exit_code (_ReturnCode): The exit code to exit with
#
def _child_shutdown(self, exit_code):
- self._queue_wrapper.close()
+ self._queue.close()
assert isinstance(exit_code, _ReturnCode)
sys.exit(exit_code.value)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index bd76e00b1..b29bc8841 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -74,11 +74,6 @@ class Scheduler():
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
self.suspended = False # Whether the scheduler is currently suspended
- # A manager for creating and monitoring IPC queues, note that this
- # can't be part of the platform or context as it is not always
- # picklable.
- self.ipc_queue_manager = self.context.platform.make_queue_manager()
-
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py