diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-16 17:37:44 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-16 17:37:44 +0000 |
commit | dd03a017f2fea55e5e4311df332a16891b0ec0ff (patch) | |
tree | 60c9d226a8b9576d25da0ef70f41c6cef4f7639a | |
parent | 1f7020266ee794643e486e4a15fb83916f69fcfc (diff) | |
parent | 13593e08f587432d2ee90b581df35423b7690ad5 (diff) | |
download | buildstream-dd03a017f2fea55e5e4311df332a16891b0ec0ff.tar.gz |
Merge branch 'aevri/spawn' into 'master'
Support the 'spawn' method of starting processes
See merge request BuildStream/buildstream!1511
-rw-r--r-- | .gitlab-ci.yml | 6 | ||||
-rw-r--r-- | src/buildstream/_frontend/cli.py | 13 | ||||
-rw-r--r-- | src/buildstream/_platform/multiprocessing.py | 108 | ||||
-rw-r--r-- | src/buildstream/_platform/platform.py | 22 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 89 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 5 | ||||
-rw-r--r-- | src/buildstream/utils.py | 11 |
7 files changed, 217 insertions, 37 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c18872083..2bdaab0fb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -189,6 +189,12 @@ tests-remote-execution: SOURCE_CACHE_SERVICE: http://docker:50052 PYTEST_ARGS: "--color=yes --remote-execution" +tests-spawn-multiprocessing-start-method: + image: registry.gitlab.com/buildstream/buildstream-docker-images/testsuite-fedora:29-master-47052095 + <<: *tests + variables: + BST_FORCE_START_METHOD: "spawn" + # Lint separately from testing lint: diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py index 5a585b276..1365cede4 100644 --- a/src/buildstream/_frontend/cli.py +++ b/src/buildstream/_frontend/cli.py @@ -1,3 +1,4 @@ +import multiprocessing import os import sys from functools import partial @@ -217,6 +218,18 @@ def override_main(self, args=None, prog_name=None, complete_var=None, .format(stream.name), err=True) sys.exit(-1) + # We can only set the global multiprocessing start method once; for that + # reason we're advised to do it inside the entrypoint, where it is easy to + # ensure the code path is only followed once. + if 'BST_FORCE_START_METHOD' in os.environ: + multiprocessing.set_start_method(os.environ['BST_FORCE_START_METHOD']) + print( + "BST_FORCE_START_METHOD: multiprocessing start method forced to:", + os.environ['BST_FORCE_START_METHOD'], + file=sys.stderr, + flush=True, + ) + original_main(self, args=args, prog_name=prog_name, complete_var=None, standalone_mode=standalone_mode, **extra) diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py new file mode 100644 index 000000000..c036651e7 --- /dev/null +++ b/src/buildstream/_platform/multiprocessing.py @@ -0,0 +1,108 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + +import multiprocessing + + +# QueueManager() +# +# This abstracts our choice of creating picklable or non-picklable Queues. +# +# Note that when choosing the 'spawn' or 'forkserver' methods of starting +# processes with the `multiprocessing` standard library module, we must use +# only picklable type as parameters to jobs. +# +class QueueManager: + def make_queue_wrapper(self): + return _PlainQueueWrapper(multiprocessing.Queue()) + + +# PicklableQueueManager() +# +# A QueueManager that creates pickable types. +# +# Note that the requirement of being picklable adds extra runtime burden, as we +# must create and maintain a `SyncManager` process that will create and manage +# the real objects. +# +class PicklableQueueManager(QueueManager): + def __init__(self): + super().__init__() + self._manager = None + + def make_queue_wrapper(self): + # Only SyncManager Queues are picklable, so we must make those. Avoid + # creating multiple expensive SyncManagers, by keeping this one around. + if self._manager is None: + self._manager = multiprocessing.Manager() + return _SyncManagerQueueWrapper(self._manager.Queue()) + + +# QueueWrapper() +# +# This abstracts our choice of using picklable or non-picklable Queues. +# +class QueueWrapper: + pass + + +class _PlainQueueWrapper(QueueWrapper): + def __init__(self, queue): + super().__init__() + self.queue = queue + + def set_potential_callback_on_queue_event(self, event_loop, callback): + # Warning: Platform specific code up ahead + # + # The multiprocessing.Queue object does not tell us how + # to receive io events in the receiving process, so we + # need to sneak in and get its file descriptor. + # + # The _reader member of the Queue is currently private + # but well known, perhaps it will become public: + # + # http://bugs.python.org/issue3831 + # + event_loop.add_reader(self.queue._reader.fileno(), callback) + + def clear_potential_callback_on_queue_event(self, event_loop): + event_loop.remove_reader(self.queue._reader.fileno()) + + def close(self): + self.queue.close() + + +class _SyncManagerQueueWrapper(QueueWrapper): + def __init__(self, queue): + super().__init__() + self.queue = queue + + def set_potential_callback_on_queue_event(self, event_loop, callback): + # We can't easily support these callbacks for Queues managed by a + # SyncManager, so don't support them for now. In later work we should + # be able to support them with threading. + pass + + def clear_potential_callback_on_queue_event(self, event_loop): + pass + + def close(self): + # SyncManager queue proxies do not have a `close()` method, they rely + # on a callback on garbage collection to release resources. For our + # purposes the queue is invalid after closing, so it's ok to release it + # here. + self.queue = None diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py index d6e0755f2..faf3d3c52 100644 --- a/src/buildstream/_platform/platform.py +++ b/src/buildstream/_platform/platform.py @@ -18,6 +18,7 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import multiprocessing import os import platform import sys @@ -27,6 +28,8 @@ import psutil from .._exceptions import PlatformError, ImplError, SandboxError from .. import utils +from .multiprocessing import QueueManager, PicklableQueueManager + class Platform(): # Platform() @@ -172,6 +175,25 @@ class Platform(): uname_machine = platform.uname().machine return Platform.canonicalize_arch(uname_machine) + def make_queue_manager(self): + if self.does_multiprocessing_start_require_pickling(): + return PicklableQueueManager() + else: + return QueueManager() + + # does_multiprocessing_start_require_pickling(): + # + # Returns True if the multiprocessing start method will pickle arguments + # to new processes. + # + # Returns: + # (bool): Whether pickling is required or not + # + def does_multiprocessing_start_require_pickling(self): + # Note that if the start method has not been set before now, it will be + # set to the platform default by `get_start_method`. + return multiprocessing.get_start_method() != 'fork' + ################################################################## # Sandbox functions # ################################################################## diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 7cbde2d39..2c4883756 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -21,6 +21,7 @@ # System imports import os +import pickle import sys import signal import datetime @@ -86,6 +87,37 @@ class _MessageType(FastEnum): SUBCLASS_CUSTOM_MESSAGE = 5 +# _do_pickled_child_job() +# +# Unpickle the supplied 'pickled' job and call 'child_action' on it. +# +# This is expected to be run in a subprocess started from the main process, as +# such it will fixup any globals to be in the expected state. +# +# Args: +# pickled (BytesIO): The pickled job to execute. +# *child_args (any) : Any parameters to be passed to `child_action`. +# +def _do_pickled_child_job(pickled, *child_args): + + utils._is_main_process = _not_main_process + + child_job = pickle.load(pickled) + return child_job.child_action(*child_args) + + +# _not_main_process() +# +# A function to replace `utils._is_main_process` when we're running in a +# subprocess that was not forked - the inheritance of the main process id will +# not work in this case. +# +# Note that we'll always not be the main process by definition. +# +def _not_main_process(): + return False + + # Job() # # The Job object represents a task that will run in parallel to the main @@ -136,7 +168,7 @@ class Job(): # Private members # self._scheduler = scheduler # The scheduler - self._queue = None # A message passing queue + self._queue_wrapper = None # A wrapper of a message passing queue self._process = None # The Process object self._watcher = None # Child process watcher self._listening = False # Whether the parent is currently listening @@ -162,8 +194,7 @@ class Job(): # Starts the job. # def start(self): - - self._queue = multiprocessing.Queue() + self._queue_wrapper = self._scheduler.ipc_queue_manager.make_queue_wrapper() self._tries += 1 self._parent_start_listening() @@ -179,12 +210,18 @@ class Job(): self._message_element_key ) - # Make sure that picklability doesn't break, by exercising it during - # our test suite. - if self._scheduler.context.is_running_in_test_suite: - pickle_child_job(child_job, self._scheduler.context.get_projects()) - - self._process = Process(target=child_job.child_action, args=[self._queue]) + if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): + pickled = pickle_child_job( + child_job, self._scheduler.context.get_projects()) + self._process = Process( + target=_do_pickled_child_job, + args=[pickled, self._queue_wrapper], + ) + else: + self._process = Process( + target=child_job.child_action, + args=[self._queue_wrapper], + ) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main @@ -478,7 +515,7 @@ class Job(): self._scheduler.job_completed(self, status) # Force the deletion of the queue and process objects to try and clean up FDs - self._queue = self._process = None + self._queue_wrapper = self._process = None # _parent_process_envelope() # @@ -523,8 +560,8 @@ class Job(): # in the parent process. # def _parent_process_queue(self): - while not self._queue.empty(): - envelope = self._queue.get_nowait() + while not self._queue_wrapper.queue.empty(): + envelope = self._queue_wrapper.queue.get_nowait() self._parent_process_envelope(envelope) # _parent_recv() @@ -540,20 +577,9 @@ class Job(): # Starts listening on the message queue # def _parent_start_listening(self): - # Warning: Platform specific code up ahead - # - # The multiprocessing.Queue object does not tell us how - # to receive io events in the receiving process, so we - # need to sneak in and get its file descriptor. - # - # The _reader member of the Queue is currently private - # but well known, perhaps it will become public: - # - # http://bugs.python.org/issue3831 - # if not self._listening: - self._scheduler.loop.add_reader( - self._queue._reader.fileno(), self._parent_recv) + self._queue_wrapper.set_potential_callback_on_queue_event( + self._scheduler.loop, self._parent_recv) self._listening = True # _parent_stop_listening() @@ -562,7 +588,8 @@ class Job(): # def _parent_stop_listening(self): if self._listening: - self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + self._queue_wrapper.clear_potential_callback_on_queue_event( + self._scheduler.loop) self._listening = False @@ -605,7 +632,7 @@ class ChildJob(): self._message_element_name = message_element_name self._message_element_key = message_element_key - self._queue = None + self._queue_wrapper = None # message(): # @@ -692,7 +719,7 @@ class ChildJob(): # Args: # queue (multiprocessing.Queue): The message queue for IPC # - def child_action(self, queue): + def child_action(self, queue_wrapper): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -710,7 +737,7 @@ class ChildJob(): # # Set the global message handler in this child # process to forward messages to the parent process - self._queue = queue + self._queue_wrapper = queue_wrapper self._messenger.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() @@ -808,7 +835,7 @@ class ChildJob(): # instances). This is sent to the parent Job. # def _send_message(self, message_type, message_data): - self._queue.put(_Envelope(message_type, message_data)) + self._queue_wrapper.queue.put(_Envelope(message_type, message_data)) # _child_send_error() # @@ -854,7 +881,7 @@ class ChildJob(): # exit_code (_ReturnCode): The exit code to exit with # def _child_shutdown(self, exit_code): - self._queue.close() + self._queue_wrapper.close() assert isinstance(exit_code, _ReturnCode) sys.exit(exit_code.value) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 9d7cf5d09..398e52e74 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -80,6 +80,11 @@ class Scheduler(): self.terminated = False # Whether the scheduler was asked to terminate or has terminated self.suspended = False # Whether the scheduler is currently suspended + # A manager for creating and monitoring IPC queues, note that this + # can't be part of the platform or context as it is not always + # picklable. + self.ipc_queue_manager = self.context.platform.make_queue_manager() + # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 2c57925d4..b78c25871 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -52,6 +52,9 @@ BST_ARBITRARY_TIMESTAMP = calendar.timegm([2011, 11, 11, 11, 11, 11]) _ALIAS_SEPARATOR = ':' _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"] +# Main process pid +_MAIN_PID = os.getpid() + class UtilError(BstError): """Raised by utility functions when system calls fail. @@ -699,17 +702,13 @@ def _pretty_size(size, dec_places=0): return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit) -# Main process pid -_main_pid = os.getpid() - - # _is_main_process() # # Return whether we are in the main process or not. # def _is_main_process(): - assert _main_pid is not None - return os.getpid() == _main_pid + assert _MAIN_PID is not None + return os.getpid() == _MAIN_PID # Recursively remove directories, ignoring file permissions as much as |