From d16c1f3e48db6638abde3a793fdd24775b010cdc Mon Sep 17 00:00:00 2001 From: Angelos Evripiotis Date: Mon, 28 Oct 2019 09:03:37 +0000 Subject: _scheduler/jobs: mv pickle details into jobpickler Move pickle_child_job and do_pickled_child_job into jobpickler.py, to keep details like saving and restoring global state out of job.py. --- src/buildstream/_scheduler/jobs/job.py | 61 ++------------------------- src/buildstream/_scheduler/jobs/jobpickler.py | 58 +++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 60 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index b36167a8f..1d7697b02 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -24,7 +24,6 @@ import asyncio import datetime import multiprocessing import os -import pickle import signal import sys import traceback @@ -33,9 +32,9 @@ import traceback from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum -from ... import _signals, utils, node +from ... import _signals, utils -from .jobpickler import pickle_child_job_data +from .jobpickler import pickle_child_job, do_pickled_child_job # Return code values shutdown of job handling child processes @@ -87,58 +86,6 @@ class _MessageType(FastEnum): SUBCLASS_CUSTOM_MESSAGE = 5 -# _pickle_child_job() -# -# Perform the special case pickling required to pickle a child job for -# unpickling in a child process. -# -# Args: -# child_job (ChildJob): The job to pickle. -# projects (List[Project]): The list of loaded projects, so we can get the -# relevant factories. -# -def _pickle_child_job(child_job, projects): - # Note that we need to consider all the state of the program that's - # necessary for the job, this includes e.g. the global state of the node - # module. - node_module_state = node._get_state_for_pickling() - return pickle_child_job_data( - (child_job, node_module_state), - projects, - ) - - -# _do_pickled_child_job() -# -# Unpickle the supplied 'pickled' job and call 'child_action' on it. -# -# This is expected to be run in a subprocess started from the main process, as -# such it will fixup any globals to be in the expected state. -# -# Args: -# pickled (BytesIO): The pickled data, and job to execute. -# *child_args (any) : Any parameters to be passed to `child_action`. -# -def _do_pickled_child_job(pickled, *child_args): - utils._is_main_process = _not_main_process - - child_job, node_module_state = pickle.load(pickled) - node._set_state_from_pickling(node_module_state) - return child_job.child_action(*child_args) - - -# _not_main_process() -# -# A function to replace `utils._is_main_process` when we're running in a -# subprocess that was not forked - the inheritance of the main process id will -# not work in this case. -# -# Note that we'll always not be the main process by definition. -# -def _not_main_process(): - return False - - # Job() # # The Job object represents a task that will run in parallel to the main @@ -233,12 +180,12 @@ class Job(): ) if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): - pickled = _pickle_child_job( + pickled = pickle_child_job( child_job, self._scheduler.context.get_projects(), ) self._process = Process( - target=_do_pickled_child_job, + target=do_pickled_child_job, args=[pickled, self._queue], ) else: diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py index 9cc8c5e06..b0465ec9e 100644 --- a/src/buildstream/_scheduler/jobs/jobpickler.py +++ b/src/buildstream/_scheduler/jobs/jobpickler.py @@ -28,7 +28,7 @@ from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Dige # BuildStream toplevel imports from ..._loader import Loader from ..._messenger import Messenger - +from ... import utils, node # Note that `str(type(proto_class))` results in `GeneratedProtocolMessageType` # instead of the concrete type, so we come up with our own names here. @@ -42,7 +42,59 @@ _PROTO_CLASS_TO_NAME = { } -# pickle_child_job_data() +# pickle_child_job() +# +# Perform the special case pickling required to pickle a child job for +# unpickling in a child process. +# +# Args: +# child_job (ChildJob): The job to pickle. +# projects (List[Project]): The list of loaded projects, so we can get the +# relevant factories. +# +def pickle_child_job(child_job, projects): + # Note that we need to consider all the state of the program that's + # necessary for the job, this includes e.g. the global state of the node + # module. + node_module_state = node._get_state_for_pickling() + return _pickle_child_job_data( + (child_job, node_module_state), + projects, + ) + + +# do_pickled_child_job() +# +# Unpickle the supplied 'pickled' job and call 'child_action' on it. +# +# This is expected to be run in a subprocess started from the main process, as +# such it will fixup any globals to be in the expected state. +# +# Args: +# pickled (BytesIO): The pickled data, and job to execute. +# *child_args (any) : Any parameters to be passed to `child_action`. +# +def do_pickled_child_job(pickled, *child_args): + utils._is_main_process = _not_main_process + + child_job, node_module_state = pickle.load(pickled) + node._set_state_from_pickling(node_module_state) + return child_job.child_action(*child_args) + + +# _not_main_process() +# +# A function to replace `utils._is_main_process` when we're running in a +# subprocess that was not forked - the inheritance of the main process id will +# not work in this case. +# +# Note that we'll always not be the main process by definition. +# +def _not_main_process(): + return False + + +# _pickle_child_job_data() # # Perform the special case pickling required to pickle a child job for # unpickling in a child process. @@ -80,7 +132,7 @@ _PROTO_CLASS_TO_NAME = { # below. Some state in plugins is not necessary for child jobs, and comes # with a heavy cost; we also need to remove this before pickling. # -def pickle_child_job_data(child_job_data, projects): +def _pickle_child_job_data(child_job_data, projects): factory_list = [ factory -- cgit v1.2.1