summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-10-29 15:47:03 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-10-29 15:47:03 +0000
commit8b5905bcdcc59e8575004a5fde7f2817dd9d01a7 (patch)
treea715d486814f6e5ab297c5972c362ebed1ae6f14
parentbc6bffeff7314add8df839917dcb744d04c90950 (diff)
parentf3a3d3695a11147a6ae44e3158c93f1c32d78556 (diff)
downloadbuildstream-8b5905bcdcc59e8575004a5fde7f2817dd9d01a7.tar.gz
Merge branch 'aevri/enable_spawn_ci_5' into 'master'
job pickling: also pickle global state in node.pyx See merge request BuildStream/buildstream!1657
-rw-r--r--.gitlab-ci.yml3
-rw-r--r--src/buildstream/_messenger.py7
-rw-r--r--src/buildstream/_scheduler/jobs/job.py40
-rw-r--r--src/buildstream/_scheduler/jobs/jobpickler.py77
-rw-r--r--src/buildstream/node.pyx27
-rw-r--r--src/buildstream/testing/runcli.py11
-rwxr-xr-xtests/conftest.py8
-rw-r--r--tests/testutils/context.py31
8 files changed, 148 insertions, 56 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 927db6e36..21de1f2de 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -199,8 +199,7 @@ tests-spawn-multiprocessing-start-method:
- mkdir -p "${INTEGRATION_CACHE}"
- useradd -Um buildstream
- chown -R buildstream:buildstream .
- - su buildstream -c "tox -- ${PYTEST_ARGS} tests/{cachekey,format,plugins,internals,sourcecache}"
-
+ - su buildstream -c "tox -- ${PYTEST_ARGS} tests/{cachekey,elements,format,plugins,internals,sourcecache}"
# Run type checkers
mypy:
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index f5f570319..20c327728 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -405,6 +405,13 @@ class Messenger():
#
del state['_render_status_cb']
+ # The "simple_task" context manager is not needed outside the main
+ # process. During testing we override it to something that cannot
+ # pickle, so just drop it when pickling to a child job. Note that it
+ # will only appear in 'state' if it has been overridden.
+ #
+ state.pop('simple_task', None)
+
# The State object is not needed outside the main process
del state['_state']
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 940b7d2cd..1d7697b02 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@ import asyncio
import datetime
import multiprocessing
import os
-import pickle
import signal
import sys
import traceback
@@ -35,7 +34,7 @@ from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
from ... import _signals, utils
-from .jobpickler import pickle_child_job
+from .jobpickler import pickle_child_job, do_pickled_child_job
# Return code values shutdown of job handling child processes
@@ -87,37 +86,6 @@ class _MessageType(FastEnum):
SUBCLASS_CUSTOM_MESSAGE = 5
-# _do_pickled_child_job()
-#
-# Unpickle the supplied 'pickled' job and call 'child_action' on it.
-#
-# This is expected to be run in a subprocess started from the main process, as
-# such it will fixup any globals to be in the expected state.
-#
-# Args:
-# pickled (BytesIO): The pickled job to execute.
-# *child_args (any) : Any parameters to be passed to `child_action`.
-#
-def _do_pickled_child_job(pickled, *child_args):
-
- utils._is_main_process = _not_main_process
-
- child_job = pickle.load(pickled)
- return child_job.child_action(*child_args)
-
-
-# _not_main_process()
-#
-# A function to replace `utils._is_main_process` when we're running in a
-# subprocess that was not forked - the inheritance of the main process id will
-# not work in this case.
-#
-# Note that we'll always not be the main process by definition.
-#
-def _not_main_process():
- return False
-
-
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -213,9 +181,11 @@ class Job():
if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
pickled = pickle_child_job(
- child_job, self._scheduler.context.get_projects())
+ child_job,
+ self._scheduler.context.get_projects(),
+ )
self._process = Process(
- target=_do_pickled_child_job,
+ target=do_pickled_child_job,
args=[pickled, self._queue],
)
else:
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
index d514e87ae..b0465ec9e 100644
--- a/src/buildstream/_scheduler/jobs/jobpickler.py
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -28,7 +28,7 @@ from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Dige
# BuildStream toplevel imports
from ..._loader import Loader
from ..._messenger import Messenger
-
+from ... import utils, node
# Note that `str(type(proto_class))` results in `GeneratedProtocolMessageType`
# instead of the concrete type, so we come up with our own names here.
@@ -47,14 +47,69 @@ _PROTO_CLASS_TO_NAME = {
# Perform the special case pickling required to pickle a child job for
# unpickling in a child process.
#
-# Note that we don't need an `unpickle_child_job`, as regular `pickle.load()`
-# will do everything required.
-#
# Args:
-# child_job (ChildJob): The job to be pickled.
+# child_job (ChildJob): The job to pickle.
# projects (List[Project]): The list of loaded projects, so we can get the
# relevant factories.
#
+def pickle_child_job(child_job, projects):
+ # Note that we need to consider all the state of the program that's
+ # necessary for the job, this includes e.g. the global state of the node
+ # module.
+ node_module_state = node._get_state_for_pickling()
+ return _pickle_child_job_data(
+ (child_job, node_module_state),
+ projects,
+ )
+
+
+# do_pickled_child_job()
+#
+# Unpickle the supplied 'pickled' job and call 'child_action' on it.
+#
+# This is expected to be run in a subprocess started from the main process, as
+# such it will fixup any globals to be in the expected state.
+#
+# Args:
+# pickled (BytesIO): The pickled data, and job to execute.
+# *child_args (any) : Any parameters to be passed to `child_action`.
+#
+def do_pickled_child_job(pickled, *child_args):
+ utils._is_main_process = _not_main_process
+
+ child_job, node_module_state = pickle.load(pickled)
+ node._set_state_from_pickling(node_module_state)
+ return child_job.child_action(*child_args)
+
+
+# _not_main_process()
+#
+# A function to replace `utils._is_main_process` when we're running in a
+# subprocess that was not forked - the inheritance of the main process id will
+# not work in this case.
+#
+# Note that we'll always not be the main process by definition.
+#
+def _not_main_process():
+ return False
+
+
+# _pickle_child_job_data()
+#
+# Perform the special case pickling required to pickle a child job for
+# unpickling in a child process.
+#
+# Note that this just enables the pickling of things that contain ChildJob-s,
+# the thing to be pickled doesn't have to be a ChildJob.
+#
+# Note that we don't need an `unpickle_child_job_data`, as regular
+# `pickle.load()` will do everything required.
+#
+# Args:
+# child_job_data (ChildJob): The job to be pickled.
+# projects (List[Project]): The list of loaded projects, so we can get the
+# relevant factories.
+#
# Returns:
# An `io.BytesIO`, with the pickled contents of the ChildJob and everything it
# transitively refers to.
@@ -77,7 +132,7 @@ _PROTO_CLASS_TO_NAME = {
# below. Some state in plugins is not necessary for child jobs, and comes
# with a heavy cost; we also need to remove this before pickling.
#
-def pickle_child_job(child_job, projects):
+def _pickle_child_job_data(child_job_data, projects):
factory_list = [
factory
@@ -97,8 +152,8 @@ def pickle_child_job(child_job, projects):
for cls, _ in factory.all_loaded_plugins()
}
- data = io.BytesIO()
- pickler = pickle.Pickler(data)
+ pickled_data = io.BytesIO()
+ pickler = pickle.Pickler(pickled_data)
pickler.dispatch_table = copyreg.dispatch_table.copy()
def reduce_plugin(plugin):
@@ -111,10 +166,10 @@ def pickle_child_job(child_job, projects):
pickler.dispatch_table[Loader] = _reduce_object
pickler.dispatch_table[Messenger] = _reduce_object
- pickler.dump(child_job)
- data.seek(0)
+ pickler.dump(child_job_data)
+ pickled_data.seek(0)
- return data
+ return pickled_data
def _reduce_object(instance):
diff --git a/src/buildstream/node.pyx b/src/buildstream/node.pyx
index bf6ce3a32..58fd0f33d 100644
--- a/src/buildstream/node.pyx
+++ b/src/buildstream/node.pyx
@@ -1596,6 +1596,33 @@ def _reset_global_state():
__counter = 0
+# _get_state_for_pickling()
+#
+# This gets the global variables necessary to preserve in a child process when
+# e.g. running a ChildJob. Things that are pickled from the parent process to
+# the child process will expect this module to be in the same state as in the
+# parent.
+#
+# Returns:
+# (object): The state to supply to a call of _get_state_for_pickling().
+#
+def _get_state_for_pickling():
+ return __FILE_LIST, __counter
+
+
+# _set_state_from_pickling()
+#
+# This restores the global variables saved from _get_state_for_pickling(). See
+# that function for more details.
+#
+# Args:
+# state (object): The result of calling _get_state_for_pickling().
+#
+def _set_state_from_pickling(state):
+ global __FILE_LIST, __counter
+ __FILE_LIST, __counter = state
+
+
#############################################################
# Module local helper Methods #
#############################################################
diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py
index 6c3ab3496..36426c8af 100644
--- a/src/buildstream/testing/runcli.py
+++ b/src/buildstream/testing/runcli.py
@@ -51,7 +51,7 @@ from _pytest.capture import MultiCapture, FDCapture, FDCaptureBinary
# Import the main cli entrypoint
from buildstream._frontend import cli as bst_cli
-from buildstream import _yaml
+from buildstream import _yaml, node
from buildstream._cas import CASCache
from buildstream.element import _get_normal_name, _compose_artifact_name
@@ -315,6 +315,15 @@ class Cli():
#
def run(self, configure=True, project=None, silent=False, env=None,
cwd=None, options=None, args=None, binary_capture=False):
+
+ # We don't want to carry the state of one bst invocation into another
+ # bst invocation. Since node _FileInfo objects hold onto BuildStream
+ # projects, this means that they would be also carried forward. This
+ # becomes a problem when spawning new processes - when pickling the
+ # state of the node module we will also be pickling elements from
+ # previous bst invocations.
+ node._reset_global_state()
+
if args is None:
args = []
if options is None:
diff --git a/tests/conftest.py b/tests/conftest.py
index 4bd226afb..216c83893 100755
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -141,10 +141,10 @@ def pytest_configure(config):
# possible. Note that some tests implicitly set the start method by using
# multiprocessing. If we wait for bst to do it, it will already be too
# late.
- print(
- "Multiprocessing method:",
- multiprocessing.get_start_method(allow_none=True),
- )
if 'BST_FORCE_START_METHOD' in os.environ:
start_method = os.environ['BST_FORCE_START_METHOD']
multiprocessing.set_start_method(start_method)
+ print(
+ "Multiprocessing method set to:",
+ start_method,
+ )
diff --git a/tests/testutils/context.py b/tests/testutils/context.py
index 849895e92..821adef0a 100644
--- a/tests/testutils/context.py
+++ b/tests/testutils/context.py
@@ -15,13 +15,11 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
-from unittest.mock import MagicMock
from types import MethodType
from contextlib import contextmanager
from buildstream._context import Context
-from buildstream._state import _Task
# Handle messages from the pipeline
@@ -29,9 +27,36 @@ def _dummy_message_handler(message, is_silenced):
pass
+class _DummyTask:
+ # Note that unittest.mock.MagicMock doesn't pickle, so we must make our
+ # _DummyTask manually here.
+ def __init__(self, state, action_name, full_name, elapsed_offset):
+ self._state = state
+ self.action_name = action_name
+ self.full_name = full_name
+ self.elapsed_offset = elapsed_offset
+ self.current_progress = None
+ self.maximum_progress = None
+
+ def set_render_cb(self, callback):
+ pass
+
+ def set_current_progress(self, progress):
+ pass
+
+ def set_maximum_progress(self, progress):
+ pass
+
+ def add_current_progress(self):
+ pass
+
+ def add_maximum_progress(self):
+ pass
+
+
@contextmanager
def _get_dummy_task(self, activity_name, *, element_name=None, full_name=None, silent_nested=False):
- yield MagicMock(spec=_Task("state", activity_name, full_name, 0))
+ yield _DummyTask("state", activity_name, full_name, 0)
# dummy_context()