summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-10-28 09:03:37 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-10-29 14:17:17 +0000
commitd16c1f3e48db6638abde3a793fdd24775b010cdc (patch)
tree1eb716f58771d0b9ed2ce2a02c8aa5f5198c40ab
parent5f48e8aa6b6a1ddf6a9d5d93fac74207f0946b30 (diff)
downloadbuildstream-d16c1f3e48db6638abde3a793fdd24775b010cdc.tar.gz
_scheduler/jobs: mv pickle details into jobpickler
Move pickle_child_job and do_pickled_child_job into jobpickler.py, to keep details like saving and restoring global state out of job.py.
-rw-r--r--src/buildstream/_scheduler/jobs/job.py61
-rw-r--r--src/buildstream/_scheduler/jobs/jobpickler.py58
2 files changed, 59 insertions, 60 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index b36167a8f..1d7697b02 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@ import asyncio
import datetime
import multiprocessing
import os
-import pickle
import signal
import sys
import traceback
@@ -33,9 +32,9 @@ import traceback
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
-from ... import _signals, utils, node
+from ... import _signals, utils
-from .jobpickler import pickle_child_job_data
+from .jobpickler import pickle_child_job, do_pickled_child_job
# Return code values shutdown of job handling child processes
@@ -87,58 +86,6 @@ class _MessageType(FastEnum):
SUBCLASS_CUSTOM_MESSAGE = 5
-# _pickle_child_job()
-#
-# Perform the special case pickling required to pickle a child job for
-# unpickling in a child process.
-#
-# Args:
-# child_job (ChildJob): The job to pickle.
-# projects (List[Project]): The list of loaded projects, so we can get the
-# relevant factories.
-#
-def _pickle_child_job(child_job, projects):
- # Note that we need to consider all the state of the program that's
- # necessary for the job, this includes e.g. the global state of the node
- # module.
- node_module_state = node._get_state_for_pickling()
- return pickle_child_job_data(
- (child_job, node_module_state),
- projects,
- )
-
-
-# _do_pickled_child_job()
-#
-# Unpickle the supplied 'pickled' job and call 'child_action' on it.
-#
-# This is expected to be run in a subprocess started from the main process, as
-# such it will fixup any globals to be in the expected state.
-#
-# Args:
-# pickled (BytesIO): The pickled data, and job to execute.
-# *child_args (any) : Any parameters to be passed to `child_action`.
-#
-def _do_pickled_child_job(pickled, *child_args):
- utils._is_main_process = _not_main_process
-
- child_job, node_module_state = pickle.load(pickled)
- node._set_state_from_pickling(node_module_state)
- return child_job.child_action(*child_args)
-
-
-# _not_main_process()
-#
-# A function to replace `utils._is_main_process` when we're running in a
-# subprocess that was not forked - the inheritance of the main process id will
-# not work in this case.
-#
-# Note that we'll always not be the main process by definition.
-#
-def _not_main_process():
- return False
-
-
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -233,12 +180,12 @@ class Job():
)
if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
- pickled = _pickle_child_job(
+ pickled = pickle_child_job(
child_job,
self._scheduler.context.get_projects(),
)
self._process = Process(
- target=_do_pickled_child_job,
+ target=do_pickled_child_job,
args=[pickled, self._queue],
)
else:
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
index 9cc8c5e06..b0465ec9e 100644
--- a/src/buildstream/_scheduler/jobs/jobpickler.py
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -28,7 +28,7 @@ from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Dige
# BuildStream toplevel imports
from ..._loader import Loader
from ..._messenger import Messenger
-
+from ... import utils, node
# Note that `str(type(proto_class))` results in `GeneratedProtocolMessageType`
# instead of the concrete type, so we come up with our own names here.
@@ -42,7 +42,59 @@ _PROTO_CLASS_TO_NAME = {
}
-# pickle_child_job_data()
+# pickle_child_job()
+#
+# Perform the special case pickling required to pickle a child job for
+# unpickling in a child process.
+#
+# Args:
+# child_job (ChildJob): The job to pickle.
+# projects (List[Project]): The list of loaded projects, so we can get the
+# relevant factories.
+#
+def pickle_child_job(child_job, projects):
+ # Note that we need to consider all the state of the program that's
+ # necessary for the job, this includes e.g. the global state of the node
+ # module.
+ node_module_state = node._get_state_for_pickling()
+ return _pickle_child_job_data(
+ (child_job, node_module_state),
+ projects,
+ )
+
+
+# do_pickled_child_job()
+#
+# Unpickle the supplied 'pickled' job and call 'child_action' on it.
+#
+# This is expected to be run in a subprocess started from the main process, as
+# such it will fixup any globals to be in the expected state.
+#
+# Args:
+# pickled (BytesIO): The pickled data, and job to execute.
+# *child_args (any) : Any parameters to be passed to `child_action`.
+#
+def do_pickled_child_job(pickled, *child_args):
+ utils._is_main_process = _not_main_process
+
+ child_job, node_module_state = pickle.load(pickled)
+ node._set_state_from_pickling(node_module_state)
+ return child_job.child_action(*child_args)
+
+
+# _not_main_process()
+#
+# A function to replace `utils._is_main_process` when we're running in a
+# subprocess that was not forked - the inheritance of the main process id will
+# not work in this case.
+#
+# Note that we'll always not be the main process by definition.
+#
+def _not_main_process():
+ return False
+
+
+# _pickle_child_job_data()
#
# Perform the special case pickling required to pickle a child job for
# unpickling in a child process.
@@ -80,7 +132,7 @@ _PROTO_CLASS_TO_NAME = {
# below. Some state in plugins is not necessary for child jobs, and comes
# with a heavy cost; we also need to remove this before pickling.
#
-def pickle_child_job_data(child_job_data, projects):
+def _pickle_child_job_data(child_job_data, projects):
factory_list = [
factory