summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r--src/buildstream/_scheduler/jobs/job.py37
-rw-r--r--src/buildstream/_scheduler/jobs/jobpickler.py27
2 files changed, 45 insertions, 19 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 940b7d2cd..b36167a8f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -33,9 +33,9 @@ import traceback
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
-from ... import _signals, utils
+from ... import _signals, utils, node
-from .jobpickler import pickle_child_job
+from .jobpickler import pickle_child_job_data
# Return code values shutdown of job handling child processes
@@ -87,6 +87,27 @@ class _MessageType(FastEnum):
SUBCLASS_CUSTOM_MESSAGE = 5
+# _pickle_child_job()
+#
+# Perform the special case pickling required to pickle a child job for
+# unpickling in a child process.
+#
+# Args:
+# child_job (ChildJob): The job to pickle.
+# projects (List[Project]): The list of loaded projects, so we can get the
+# relevant factories.
+#
+def _pickle_child_job(child_job, projects):
+ # Note that we need to consider all the state of the program that's
+ # necessary for the job, this includes e.g. the global state of the node
+ # module.
+ node_module_state = node._get_state_for_pickling()
+ return pickle_child_job_data(
+ (child_job, node_module_state),
+ projects,
+ )
+
+
# _do_pickled_child_job()
#
# Unpickle the supplied 'pickled' job and call 'child_action' on it.
@@ -95,14 +116,14 @@ class _MessageType(FastEnum):
# such it will fixup any globals to be in the expected state.
#
# Args:
-# pickled (BytesIO): The pickled job to execute.
+# pickled (BytesIO): The pickled data, and job to execute.
# *child_args (any) : Any parameters to be passed to `child_action`.
#
def _do_pickled_child_job(pickled, *child_args):
-
utils._is_main_process = _not_main_process
- child_job = pickle.load(pickled)
+ child_job, node_module_state = pickle.load(pickled)
+ node._set_state_from_pickling(node_module_state)
return child_job.child_action(*child_args)
@@ -212,8 +233,10 @@ class Job():
)
if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
- pickled = pickle_child_job(
- child_job, self._scheduler.context.get_projects())
+ pickled = _pickle_child_job(
+ child_job,
+ self._scheduler.context.get_projects(),
+ )
self._process = Process(
target=_do_pickled_child_job,
args=[pickled, self._queue],
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
index d514e87ae..9cc8c5e06 100644
--- a/src/buildstream/_scheduler/jobs/jobpickler.py
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -42,18 +42,21 @@ _PROTO_CLASS_TO_NAME = {
}
-# pickle_child_job()
+# pickle_child_job_data()
#
# Perform the special case pickling required to pickle a child job for
# unpickling in a child process.
#
-# Note that we don't need an `unpickle_child_job`, as regular `pickle.load()`
-# will do everything required.
+# Note that this just enables the pickling of things that contain ChildJob-s,
+# the thing to be pickled doesn't have to be a ChildJob.
+#
+# Note that we don't need an `unpickle_child_job_data`, as regular
+# `pickle.load()` will do everything required.
#
# Args:
-# child_job (ChildJob): The job to be pickled.
-# projects (List[Project]): The list of loaded projects, so we can get the
-# relevant factories.
+# child_job_data (ChildJob): The job to be pickled.
+# projects (List[Project]): The list of loaded projects, so we can get the
+# relevant factories.
#
# Returns:
# An `io.BytesIO`, with the pickled contents of the ChildJob and everything it
@@ -77,7 +80,7 @@ _PROTO_CLASS_TO_NAME = {
# below. Some state in plugins is not necessary for child jobs, and comes
# with a heavy cost; we also need to remove this before pickling.
#
-def pickle_child_job(child_job, projects):
+def pickle_child_job_data(child_job_data, projects):
factory_list = [
factory
@@ -97,8 +100,8 @@ def pickle_child_job(child_job, projects):
for cls, _ in factory.all_loaded_plugins()
}
- data = io.BytesIO()
- pickler = pickle.Pickler(data)
+ pickled_data = io.BytesIO()
+ pickler = pickle.Pickler(pickled_data)
pickler.dispatch_table = copyreg.dispatch_table.copy()
def reduce_plugin(plugin):
@@ -111,10 +114,10 @@ def pickle_child_job(child_job, projects):
pickler.dispatch_table[Loader] = _reduce_object
pickler.dispatch_table[Messenger] = _reduce_object
- pickler.dump(child_job)
- data.seek(0)
+ pickler.dump(child_job_data)
+ pickled_data.seek(0)
- return data
+ return pickled_data
def _reduce_object(instance):