diff options
-rw-r--r-- | .gitlab-ci.yml | 3 | ||||
-rw-r--r-- | src/buildstream/_messenger.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 40 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/jobpickler.py | 77 | ||||
-rw-r--r-- | src/buildstream/node.pyx | 27 | ||||
-rw-r--r-- | src/buildstream/testing/runcli.py | 11 | ||||
-rwxr-xr-x | tests/conftest.py | 8 | ||||
-rw-r--r-- | tests/testutils/context.py | 31 |
8 files changed, 148 insertions, 56 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 927db6e36..21de1f2de 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -199,8 +199,7 @@ tests-spawn-multiprocessing-start-method: - mkdir -p "${INTEGRATION_CACHE}" - useradd -Um buildstream - chown -R buildstream:buildstream . - - su buildstream -c "tox -- ${PYTEST_ARGS} tests/{cachekey,format,plugins,internals,sourcecache}" - + - su buildstream -c "tox -- ${PYTEST_ARGS} tests/{cachekey,elements,format,plugins,internals,sourcecache}" # Run type checkers mypy: diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index f5f570319..20c327728 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -405,6 +405,13 @@ class Messenger(): # del state['_render_status_cb'] + # The "simple_task" context manager is not needed outside the main + # process. During testing we override it to something that cannot + # pickle, so just drop it when pickling to a child job. Note that it + # will only appear in 'state' if it has been overridden. + # + state.pop('simple_task', None) + # The State object is not needed outside the main process del state['_state'] diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 940b7d2cd..1d7697b02 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -24,7 +24,6 @@ import asyncio import datetime import multiprocessing import os -import pickle import signal import sys import traceback @@ -35,7 +34,7 @@ from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum from ... import _signals, utils -from .jobpickler import pickle_child_job +from .jobpickler import pickle_child_job, do_pickled_child_job # Return code values shutdown of job handling child processes @@ -87,37 +86,6 @@ class _MessageType(FastEnum): SUBCLASS_CUSTOM_MESSAGE = 5 -# _do_pickled_child_job() -# -# Unpickle the supplied 'pickled' job and call 'child_action' on it. -# -# This is expected to be run in a subprocess started from the main process, as -# such it will fixup any globals to be in the expected state. -# -# Args: -# pickled (BytesIO): The pickled job to execute. -# *child_args (any) : Any parameters to be passed to `child_action`. -# -def _do_pickled_child_job(pickled, *child_args): - - utils._is_main_process = _not_main_process - - child_job = pickle.load(pickled) - return child_job.child_action(*child_args) - - -# _not_main_process() -# -# A function to replace `utils._is_main_process` when we're running in a -# subprocess that was not forked - the inheritance of the main process id will -# not work in this case. -# -# Note that we'll always not be the main process by definition. -# -def _not_main_process(): - return False - - # Job() # # The Job object represents a task that will run in parallel to the main @@ -213,9 +181,11 @@ class Job(): if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): pickled = pickle_child_job( - child_job, self._scheduler.context.get_projects()) + child_job, + self._scheduler.context.get_projects(), + ) self._process = Process( - target=_do_pickled_child_job, + target=do_pickled_child_job, args=[pickled, self._queue], ) else: diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py index d514e87ae..b0465ec9e 100644 --- a/src/buildstream/_scheduler/jobs/jobpickler.py +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -28,7 +28,7 @@ from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Dige # BuildStream toplevel imports from ..._loader import Loader from ..._messenger import Messenger - +from ... import utils, node # Note that `str(type(proto_class))` results in `GeneratedProtocolMessageType` # instead of the concrete type, so we come up with our own names here. @@ -47,14 +47,69 @@ _PROTO_CLASS_TO_NAME = { # Perform the special case pickling required to pickle a child job for # unpickling in a child process. # -# Note that we don't need an `unpickle_child_job`, as regular `pickle.load()` -# will do everything required. -# # Args: -# child_job (ChildJob): The job to be pickled. +# child_job (ChildJob): The job to pickle. # projects (List[Project]): The list of loaded projects, so we can get the # relevant factories. # +def pickle_child_job(child_job, projects): + # Note that we need to consider all the state of the program that's + # necessary for the job, this includes e.g. the global state of the node + # module. + node_module_state = node._get_state_for_pickling() + return _pickle_child_job_data( + (child_job, node_module_state), + projects, + ) + + +# do_pickled_child_job() +# +# Unpickle the supplied 'pickled' job and call 'child_action' on it. +# +# This is expected to be run in a subprocess started from the main process, as +# such it will fixup any globals to be in the expected state. +# +# Args: +# pickled (BytesIO): The pickled data, and job to execute. +# *child_args (any) : Any parameters to be passed to `child_action`. +# +def do_pickled_child_job(pickled, *child_args): + utils._is_main_process = _not_main_process + + child_job, node_module_state = pickle.load(pickled) + node._set_state_from_pickling(node_module_state) + return child_job.child_action(*child_args) + + +# _not_main_process() +# +# A function to replace `utils._is_main_process` when we're running in a +# subprocess that was not forked - the inheritance of the main process id will +# not work in this case. +# +# Note that we'll always not be the main process by definition. +# +def _not_main_process(): + return False + + +# _pickle_child_job_data() +# +# Perform the special case pickling required to pickle a child job for +# unpickling in a child process. +# +# Note that this just enables the pickling of things that contain ChildJob-s, +# the thing to be pickled doesn't have to be a ChildJob. +# +# Note that we don't need an `unpickle_child_job_data`, as regular +# `pickle.load()` will do everything required. +# +# Args: +# child_job_data (ChildJob): The job to be pickled. +# projects (List[Project]): The list of loaded projects, so we can get the +# relevant factories. +# # Returns: # An `io.BytesIO`, with the pickled contents of the ChildJob and everything it # transitively refers to. @@ -77,7 +132,7 @@ _PROTO_CLASS_TO_NAME = { # below. Some state in plugins is not necessary for child jobs, and comes # with a heavy cost; we also need to remove this before pickling. # -def pickle_child_job(child_job, projects): +def _pickle_child_job_data(child_job_data, projects): factory_list = [ factory @@ -97,8 +152,8 @@ def pickle_child_job(child_job, projects): for cls, _ in factory.all_loaded_plugins() } - data = io.BytesIO() - pickler = pickle.Pickler(data) + pickled_data = io.BytesIO() + pickler = pickle.Pickler(pickled_data) pickler.dispatch_table = copyreg.dispatch_table.copy() def reduce_plugin(plugin): @@ -111,10 +166,10 @@ def pickle_child_job(child_job, projects): pickler.dispatch_table[Loader] = _reduce_object pickler.dispatch_table[Messenger] = _reduce_object - pickler.dump(child_job) - data.seek(0) + pickler.dump(child_job_data) + pickled_data.seek(0) - return data + return pickled_data def _reduce_object(instance): diff --git a/src/buildstream/node.pyx b/src/buildstream/node.pyx index bf6ce3a32..58fd0f33d 100644 --- a/src/buildstream/node.pyx +++ b/src/buildstream/node.pyx @@ -1596,6 +1596,33 @@ def _reset_global_state(): __counter = 0 +# _get_state_for_pickling() +# +# This gets the global variables necessary to preserve in a child process when +# e.g. running a ChildJob. Things that are pickled from the parent process to +# the child process will expect this module to be in the same state as in the +# parent. +# +# Returns: +# (object): The state to supply to a call of _get_state_for_pickling(). +# +def _get_state_for_pickling(): + return __FILE_LIST, __counter + + +# _set_state_from_pickling() +# +# This restores the global variables saved from _get_state_for_pickling(). See +# that function for more details. +# +# Args: +# state (object): The result of calling _get_state_for_pickling(). +# +def _set_state_from_pickling(state): + global __FILE_LIST, __counter + __FILE_LIST, __counter = state + + ############################################################# # Module local helper Methods # ############################################################# diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py index 6c3ab3496..36426c8af 100644 --- a/src/buildstream/testing/runcli.py +++ b/src/buildstream/testing/runcli.py @@ -51,7 +51,7 @@ from _pytest.capture import MultiCapture, FDCapture, FDCaptureBinary # Import the main cli entrypoint from buildstream._frontend import cli as bst_cli -from buildstream import _yaml +from buildstream import _yaml, node from buildstream._cas import CASCache from buildstream.element import _get_normal_name, _compose_artifact_name @@ -315,6 +315,15 @@ class Cli(): # def run(self, configure=True, project=None, silent=False, env=None, cwd=None, options=None, args=None, binary_capture=False): + + # We don't want to carry the state of one bst invocation into another + # bst invocation. Since node _FileInfo objects hold onto BuildStream + # projects, this means that they would be also carried forward. This + # becomes a problem when spawning new processes - when pickling the + # state of the node module we will also be pickling elements from + # previous bst invocations. + node._reset_global_state() + if args is None: args = [] if options is None: diff --git a/tests/conftest.py b/tests/conftest.py index 4bd226afb..216c83893 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -141,10 +141,10 @@ def pytest_configure(config): # possible. Note that some tests implicitly set the start method by using # multiprocessing. If we wait for bst to do it, it will already be too # late. - print( - "Multiprocessing method:", - multiprocessing.get_start_method(allow_none=True), - ) if 'BST_FORCE_START_METHOD' in os.environ: start_method = os.environ['BST_FORCE_START_METHOD'] multiprocessing.set_start_method(start_method) + print( + "Multiprocessing method set to:", + start_method, + ) diff --git a/tests/testutils/context.py b/tests/testutils/context.py index 849895e92..821adef0a 100644 --- a/tests/testutils/context.py +++ b/tests/testutils/context.py @@ -15,13 +15,11 @@ # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os -from unittest.mock import MagicMock from types import MethodType from contextlib import contextmanager from buildstream._context import Context -from buildstream._state import _Task # Handle messages from the pipeline @@ -29,9 +27,36 @@ def _dummy_message_handler(message, is_silenced): pass +class _DummyTask: + # Note that unittest.mock.MagicMock doesn't pickle, so we must make our + # _DummyTask manually here. + def __init__(self, state, action_name, full_name, elapsed_offset): + self._state = state + self.action_name = action_name + self.full_name = full_name + self.elapsed_offset = elapsed_offset + self.current_progress = None + self.maximum_progress = None + + def set_render_cb(self, callback): + pass + + def set_current_progress(self, progress): + pass + + def set_maximum_progress(self, progress): + pass + + def add_current_progress(self): + pass + + def add_maximum_progress(self): + pass + + @contextmanager def _get_dummy_task(self, activity_name, *, element_name=None, full_name=None, silent_nested=False): - yield MagicMock(spec=_Task("state", activity_name, full_name, 0)) + yield _DummyTask("state", activity_name, full_name, 0) # dummy_context() |