summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/jobs
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-08-01 10:32:14 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-08-16 15:42:35 +0000
commit9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d (patch)
treee4f7718d507baa90b5fabae60908576006319bc9 /src/buildstream/_scheduler/jobs
parent36457ecd6783d0bf529af6f948ee4de5f600e21f (diff)
downloadbuildstream-9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d.tar.gz
Support pickling jobs if the platform requires it
Add support for using `multiprocessing.Manager` and the associated queues. Downgrade the queue event callback guarantees accordingly. In later work we may be able to support callbacks in all scenarios. Pickle and unpickle the child job if the platform requires it.
Diffstat (limited to 'src/buildstream/_scheduler/jobs')
-rw-r--r--src/buildstream/_scheduler/jobs/job.py50
1 files changed, 44 insertions, 6 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 4104ddc73..2c4883756 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -21,6 +21,7 @@
# System imports
import os
+import pickle
import sys
import signal
import datetime
@@ -86,6 +87,37 @@ class _MessageType(FastEnum):
SUBCLASS_CUSTOM_MESSAGE = 5
+# _do_pickled_child_job()
+#
+# Unpickle the supplied 'pickled' job and call 'child_action' on it.
+#
+# This is expected to be run in a subprocess started from the main process, as
+# such it will fixup any globals to be in the expected state.
+#
+# Args:
+# pickled (BytesIO): The pickled job to execute.
+# *child_args (any) : Any parameters to be passed to `child_action`.
+#
+def _do_pickled_child_job(pickled, *child_args):
+
+ utils._is_main_process = _not_main_process
+
+ child_job = pickle.load(pickled)
+ return child_job.child_action(*child_args)
+
+
+# _not_main_process()
+#
+# A function to replace `utils._is_main_process` when we're running in a
+# subprocess that was not forked - the inheritance of the main process id will
+# not work in this case.
+#
+# Note that we'll always not be the main process by definition.
+#
+def _not_main_process():
+ return False
+
+
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -178,12 +210,18 @@ class Job():
self._message_element_key
)
- # Make sure that picklability doesn't break, by exercising it during
- # our test suite.
- if self._scheduler.context.is_running_in_test_suite:
- pickle_child_job(child_job, self._scheduler.context.get_projects())
-
- self._process = Process(target=child_job.child_action, args=[self._queue])
+ if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
+ pickled = pickle_child_job(
+ child_job, self._scheduler.context.get_projects())
+ self._process = Process(
+ target=_do_pickled_child_job,
+ args=[pickled, self._queue_wrapper],
+ )
+ else:
+ self._process = Process(
+ target=child_job.child_action,
+ args=[self._queue_wrapper],
+ )
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main