diff options
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 37 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/jobpickler.py | 27 |
2 files changed, 45 insertions, 19 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 940b7d2cd..b36167a8f 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -33,9 +33,9 @@ import traceback from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum -from ... import _signals, utils +from ... import _signals, utils, node -from .jobpickler import pickle_child_job +from .jobpickler import pickle_child_job_data # Return code values shutdown of job handling child processes @@ -87,6 +87,27 @@ class _MessageType(FastEnum): SUBCLASS_CUSTOM_MESSAGE = 5 +# _pickle_child_job() +# +# Perform the special case pickling required to pickle a child job for +# unpickling in a child process. +# +# Args: +# child_job (ChildJob): The job to pickle. +# projects (List[Project]): The list of loaded projects, so we can get the +# relevant factories. +# +def _pickle_child_job(child_job, projects): + # Note that we need to consider all the state of the program that's + # necessary for the job, this includes e.g. the global state of the node + # module. + node_module_state = node._get_state_for_pickling() + return pickle_child_job_data( + (child_job, node_module_state), + projects, + ) + + # _do_pickled_child_job() # # Unpickle the supplied 'pickled' job and call 'child_action' on it. @@ -95,14 +116,14 @@ class _MessageType(FastEnum): # such it will fixup any globals to be in the expected state. # # Args: -# pickled (BytesIO): The pickled job to execute. +# pickled (BytesIO): The pickled data, and job to execute. # *child_args (any) : Any parameters to be passed to `child_action`. # def _do_pickled_child_job(pickled, *child_args): - utils._is_main_process = _not_main_process - child_job = pickle.load(pickled) + child_job, node_module_state = pickle.load(pickled) + node._set_state_from_pickling(node_module_state) return child_job.child_action(*child_args) @@ -212,8 +233,10 @@ class Job(): ) if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): - pickled = pickle_child_job( - child_job, self._scheduler.context.get_projects()) + pickled = _pickle_child_job( + child_job, + self._scheduler.context.get_projects(), + ) self._process = Process( target=_do_pickled_child_job, args=[pickled, self._queue], diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py index d514e87ae..9cc8c5e06 100644 --- a/src/buildstream/_scheduler/jobs/jobpickler.py +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -42,18 +42,21 @@ _PROTO_CLASS_TO_NAME = { } -# pickle_child_job() +# pickle_child_job_data() # # Perform the special case pickling required to pickle a child job for # unpickling in a child process. # -# Note that we don't need an `unpickle_child_job`, as regular `pickle.load()` -# will do everything required. +# Note that this just enables the pickling of things that contain ChildJob-s, +# the thing to be pickled doesn't have to be a ChildJob. +# +# Note that we don't need an `unpickle_child_job_data`, as regular +# `pickle.load()` will do everything required. # # Args: -# child_job (ChildJob): The job to be pickled. -# projects (List[Project]): The list of loaded projects, so we can get the -# relevant factories. +# child_job_data (ChildJob): The job to be pickled. +# projects (List[Project]): The list of loaded projects, so we can get the +# relevant factories. # # Returns: # An `io.BytesIO`, with the pickled contents of the ChildJob and everything it @@ -77,7 +80,7 @@ _PROTO_CLASS_TO_NAME = { # below. Some state in plugins is not necessary for child jobs, and comes # with a heavy cost; we also need to remove this before pickling. # -def pickle_child_job(child_job, projects): +def pickle_child_job_data(child_job_data, projects): factory_list = [ factory @@ -97,8 +100,8 @@ def pickle_child_job(child_job, projects): for cls, _ in factory.all_loaded_plugins() } - data = io.BytesIO() - pickler = pickle.Pickler(data) + pickled_data = io.BytesIO() + pickler = pickle.Pickler(pickled_data) pickler.dispatch_table = copyreg.dispatch_table.copy() def reduce_plugin(plugin): @@ -111,10 +114,10 @@ def pickle_child_job(child_job, projects): pickler.dispatch_table[Loader] = _reduce_object pickler.dispatch_table[Messenger] = _reduce_object - pickler.dump(child_job) - data.seek(0) + pickler.dump(child_job_data) + pickled_data.seek(0) - return data + return pickled_data def _reduce_object(instance): |