diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-08-01 10:32:14 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-16 15:42:35 +0000 |
commit | 9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d (patch) | |
tree | e4f7718d507baa90b5fabae60908576006319bc9 | |
parent | 36457ecd6783d0bf529af6f948ee4de5f600e21f (diff) | |
download | buildstream-9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d.tar.gz |
Support pickling jobs if the platform requires it
Add support for using `multiprocessing.Manager` and the associated
queues. Downgrade the queue event callback guarantees accordingly. In
later work we may be able to support callbacks in all scenarios.
Pickle and unpickle the child job if the platform requires it.
-rw-r--r-- | src/buildstream/_platform/multiprocessing.py | 43 | ||||
-rw-r--r-- | src/buildstream/_platform/platform.py | 21 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 50 |
3 files changed, 106 insertions, 8 deletions
diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py index 1f7be8338..c036651e7 100644 --- a/src/buildstream/_platform/multiprocessing.py +++ b/src/buildstream/_platform/multiprocessing.py @@ -31,6 +31,27 @@ class QueueManager: return _PlainQueueWrapper(multiprocessing.Queue()) +# PicklableQueueManager() +# +# A QueueManager that creates pickable types. +# +# Note that the requirement of being picklable adds extra runtime burden, as we +# must create and maintain a `SyncManager` process that will create and manage +# the real objects. +# +class PicklableQueueManager(QueueManager): + def __init__(self): + super().__init__() + self._manager = None + + def make_queue_wrapper(self): + # Only SyncManager Queues are picklable, so we must make those. Avoid + # creating multiple expensive SyncManagers, by keeping this one around. + if self._manager is None: + self._manager = multiprocessing.Manager() + return _SyncManagerQueueWrapper(self._manager.Queue()) + + # QueueWrapper() # # This abstracts our choice of using picklable or non-picklable Queues. @@ -63,3 +84,25 @@ class _PlainQueueWrapper(QueueWrapper): def close(self): self.queue.close() + + +class _SyncManagerQueueWrapper(QueueWrapper): + def __init__(self, queue): + super().__init__() + self.queue = queue + + def set_potential_callback_on_queue_event(self, event_loop, callback): + # We can't easily support these callbacks for Queues managed by a + # SyncManager, so don't support them for now. In later work we should + # be able to support them with threading. + pass + + def clear_potential_callback_on_queue_event(self, event_loop): + pass + + def close(self): + # SyncManager queue proxies do not have a `close()` method, they rely + # on a callback on garbage collection to release resources. For our + # purposes the queue is invalid after closing, so it's ok to release it + # here. + self.queue = None diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py index 18e26a9a1..faf3d3c52 100644 --- a/src/buildstream/_platform/platform.py +++ b/src/buildstream/_platform/platform.py @@ -18,6 +18,7 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import multiprocessing import os import platform import sys @@ -27,7 +28,7 @@ import psutil from .._exceptions import PlatformError, ImplError, SandboxError from .. import utils -from .multiprocessing import QueueManager +from .multiprocessing import QueueManager, PicklableQueueManager class Platform(): @@ -175,7 +176,23 @@ class Platform(): return Platform.canonicalize_arch(uname_machine) def make_queue_manager(self): - return QueueManager() + if self.does_multiprocessing_start_require_pickling(): + return PicklableQueueManager() + else: + return QueueManager() + + # does_multiprocessing_start_require_pickling(): + # + # Returns True if the multiprocessing start method will pickle arguments + # to new processes. + # + # Returns: + # (bool): Whether pickling is required or not + # + def does_multiprocessing_start_require_pickling(self): + # Note that if the start method has not been set before now, it will be + # set to the platform default by `get_start_method`. + return multiprocessing.get_start_method() != 'fork' ################################################################## # Sandbox functions # diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 4104ddc73..2c4883756 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -21,6 +21,7 @@ # System imports import os +import pickle import sys import signal import datetime @@ -86,6 +87,37 @@ class _MessageType(FastEnum): SUBCLASS_CUSTOM_MESSAGE = 5 +# _do_pickled_child_job() +# +# Unpickle the supplied 'pickled' job and call 'child_action' on it. +# +# This is expected to be run in a subprocess started from the main process, as +# such it will fixup any globals to be in the expected state. +# +# Args: +# pickled (BytesIO): The pickled job to execute. +# *child_args (any) : Any parameters to be passed to `child_action`. +# +def _do_pickled_child_job(pickled, *child_args): + + utils._is_main_process = _not_main_process + + child_job = pickle.load(pickled) + return child_job.child_action(*child_args) + + +# _not_main_process() +# +# A function to replace `utils._is_main_process` when we're running in a +# subprocess that was not forked - the inheritance of the main process id will +# not work in this case. +# +# Note that we'll always not be the main process by definition. +# +def _not_main_process(): + return False + + # Job() # # The Job object represents a task that will run in parallel to the main @@ -178,12 +210,18 @@ class Job(): self._message_element_key ) - # Make sure that picklability doesn't break, by exercising it during - # our test suite. - if self._scheduler.context.is_running_in_test_suite: - pickle_child_job(child_job, self._scheduler.context.get_projects()) - - self._process = Process(target=child_job.child_action, args=[self._queue]) + if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): + pickled = pickle_child_job( + child_job, self._scheduler.context.get_projects()) + self._process = Process( + target=_do_pickled_child_job, + args=[pickled, self._queue_wrapper], + ) + else: + self._process = Process( + target=child_job.child_action, + args=[self._queue_wrapper], + ) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main |