diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-08-01 10:32:14 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-16 15:42:35 +0000 |
commit | 9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d (patch) | |
tree | e4f7718d507baa90b5fabae60908576006319bc9 /src/buildstream/_platform | |
parent | 36457ecd6783d0bf529af6f948ee4de5f600e21f (diff) | |
download | buildstream-9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d.tar.gz |
Support pickling jobs if the platform requires it
Add support for using `multiprocessing.Manager` and the associated
queues. Downgrade the queue event callback guarantees accordingly. In
later work we may be able to support callbacks in all scenarios.
Pickle and unpickle the child job if the platform requires it.
Diffstat (limited to 'src/buildstream/_platform')
-rw-r--r-- | src/buildstream/_platform/multiprocessing.py | 43 | ||||
-rw-r--r-- | src/buildstream/_platform/platform.py | 21 |
2 files changed, 62 insertions, 2 deletions
diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py index 1f7be8338..c036651e7 100644 --- a/src/buildstream/_platform/multiprocessing.py +++ b/src/buildstream/_platform/multiprocessing.py @@ -31,6 +31,27 @@ class QueueManager: return _PlainQueueWrapper(multiprocessing.Queue()) +# PicklableQueueManager() +# +# A QueueManager that creates pickable types. +# +# Note that the requirement of being picklable adds extra runtime burden, as we +# must create and maintain a `SyncManager` process that will create and manage +# the real objects. +# +class PicklableQueueManager(QueueManager): + def __init__(self): + super().__init__() + self._manager = None + + def make_queue_wrapper(self): + # Only SyncManager Queues are picklable, so we must make those. Avoid + # creating multiple expensive SyncManagers, by keeping this one around. + if self._manager is None: + self._manager = multiprocessing.Manager() + return _SyncManagerQueueWrapper(self._manager.Queue()) + + # QueueWrapper() # # This abstracts our choice of using picklable or non-picklable Queues. @@ -63,3 +84,25 @@ class _PlainQueueWrapper(QueueWrapper): def close(self): self.queue.close() + + +class _SyncManagerQueueWrapper(QueueWrapper): + def __init__(self, queue): + super().__init__() + self.queue = queue + + def set_potential_callback_on_queue_event(self, event_loop, callback): + # We can't easily support these callbacks for Queues managed by a + # SyncManager, so don't support them for now. In later work we should + # be able to support them with threading. + pass + + def clear_potential_callback_on_queue_event(self, event_loop): + pass + + def close(self): + # SyncManager queue proxies do not have a `close()` method, they rely + # on a callback on garbage collection to release resources. For our + # purposes the queue is invalid after closing, so it's ok to release it + # here. + self.queue = None diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py index 18e26a9a1..faf3d3c52 100644 --- a/src/buildstream/_platform/platform.py +++ b/src/buildstream/_platform/platform.py @@ -18,6 +18,7 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import multiprocessing import os import platform import sys @@ -27,7 +28,7 @@ import psutil from .._exceptions import PlatformError, ImplError, SandboxError from .. import utils -from .multiprocessing import QueueManager +from .multiprocessing import QueueManager, PicklableQueueManager class Platform(): @@ -175,7 +176,23 @@ class Platform(): return Platform.canonicalize_arch(uname_machine) def make_queue_manager(self): - return QueueManager() + if self.does_multiprocessing_start_require_pickling(): + return PicklableQueueManager() + else: + return QueueManager() + + # does_multiprocessing_start_require_pickling(): + # + # Returns True if the multiprocessing start method will pickle arguments + # to new processes. + # + # Returns: + # (bool): Whether pickling is required or not + # + def does_multiprocessing_start_require_pickling(self): + # Note that if the start method has not been set before now, it will be + # set to the platform default by `get_start_method`. + return multiprocessing.get_start_method() != 'fork' ################################################################## # Sandbox functions # |