summaryrefslogtreecommitdiff
path: root/src/buildstream/_platform
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-08-01 10:32:14 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-16 15:42:35 +0000
commit9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d (patch)
treee4f7718d507baa90b5fabae60908576006319bc9 /src/buildstream/_platform
parent36457ecd6783d0bf529af6f948ee4de5f600e21f (diff)
downloadbuildstream-9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d.tar.gz
Support pickling jobs if the platform requires it
Add support for using `multiprocessing.Manager` and the associated queues. Downgrade the queue event callback guarantees accordingly. In later work we may be able to support callbacks in all scenarios. Pickle and unpickle the child job if the platform requires it.
Diffstat (limited to 'src/buildstream/_platform')
-rw-r--r--src/buildstream/_platform/multiprocessing.py43
-rw-r--r--src/buildstream/_platform/platform.py21
2 files changed, 62 insertions, 2 deletions
diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py
index 1f7be8338..c036651e7 100644
--- a/src/buildstream/_platform/multiprocessing.py
+++ b/src/buildstream/_platform/multiprocessing.py
@@ -31,6 +31,27 @@ class QueueManager:
return _PlainQueueWrapper(multiprocessing.Queue())
+# PicklableQueueManager()
+#
+# A QueueManager that creates pickable types.
+#
+# Note that the requirement of being picklable adds extra runtime burden, as we
+# must create and maintain a `SyncManager` process that will create and manage
+# the real objects.
+#
+class PicklableQueueManager(QueueManager):
+ def __init__(self):
+ super().__init__()
+ self._manager = None
+
+ def make_queue_wrapper(self):
+ # Only SyncManager Queues are picklable, so we must make those. Avoid
+ # creating multiple expensive SyncManagers, by keeping this one around.
+ if self._manager is None:
+ self._manager = multiprocessing.Manager()
+ return _SyncManagerQueueWrapper(self._manager.Queue())
+
+
# QueueWrapper()
#
# This abstracts our choice of using picklable or non-picklable Queues.
@@ -63,3 +84,25 @@ class _PlainQueueWrapper(QueueWrapper):
def close(self):
self.queue.close()
+
+
+class _SyncManagerQueueWrapper(QueueWrapper):
+ def __init__(self, queue):
+ super().__init__()
+ self.queue = queue
+
+ def set_potential_callback_on_queue_event(self, event_loop, callback):
+ # We can't easily support these callbacks for Queues managed by a
+ # SyncManager, so don't support them for now. In later work we should
+ # be able to support them with threading.
+ pass
+
+ def clear_potential_callback_on_queue_event(self, event_loop):
+ pass
+
+ def close(self):
+ # SyncManager queue proxies do not have a `close()` method, they rely
+ # on a callback on garbage collection to release resources. For our
+ # purposes the queue is invalid after closing, so it's ok to release it
+ # here.
+ self.queue = None
diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py
index 18e26a9a1..faf3d3c52 100644
--- a/src/buildstream/_platform/platform.py
+++ b/src/buildstream/_platform/platform.py
@@ -18,6 +18,7 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
+import multiprocessing
import os
import platform
import sys
@@ -27,7 +28,7 @@ import psutil
from .._exceptions import PlatformError, ImplError, SandboxError
from .. import utils
-from .multiprocessing import QueueManager
+from .multiprocessing import QueueManager, PicklableQueueManager
class Platform():
@@ -175,7 +176,23 @@ class Platform():
return Platform.canonicalize_arch(uname_machine)
def make_queue_manager(self):
- return QueueManager()
+ if self.does_multiprocessing_start_require_pickling():
+ return PicklableQueueManager()
+ else:
+ return QueueManager()
+
+ # does_multiprocessing_start_require_pickling():
+ #
+ # Returns True if the multiprocessing start method will pickle arguments
+ # to new processes.
+ #
+ # Returns:
+ # (bool): Whether pickling is required or not
+ #
+ def does_multiprocessing_start_require_pickling(self):
+ # Note that if the start method has not been set before now, it will be
+ # set to the platform default by `get_start_method`.
+ return multiprocessing.get_start_method() != 'fork'
##################################################################
# Sandbox functions #