summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-08-01 10:32:14 +0100
committerAngelos Evripiotis <jevripiotis@bloomberg.net>2019-08-13 15:08:04 +0100
commitaffa26ae150d6eacc7a67b1d3b733e9d904566e7 (patch)
tree2d402b6771701a0825535f68850179b6460edf9e
parentb3c1b2265d1b1219a01f98923fca8328c8ba296b (diff)
downloadbuildstream-affa26ae150d6eacc7a67b1d3b733e9d904566e7.tar.gz
Support pickling jobs if the platform requires it
Add support for using `multiprocessing.Manager` and the associated queues. Downgrade the queue event callback guarantees accordingly. In later work we may be able to support callbacks in all scenarios. Pickle and unpickle the child job if the platform requires it.
-rw-r--r--src/buildstream/_platform/multiprocessing.py43
-rw-r--r--src/buildstream/_platform/platform.py21
-rw-r--r--src/buildstream/_scheduler/jobs/job.py50
3 files changed, 106 insertions, 8 deletions
diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py
index 1f7be8338..c036651e7 100644
--- a/src/buildstream/_platform/multiprocessing.py
+++ b/src/buildstream/_platform/multiprocessing.py
@@ -31,6 +31,27 @@ class QueueManager:
return _PlainQueueWrapper(multiprocessing.Queue())
+# PicklableQueueManager()
+#
+# A QueueManager that creates pickable types.
+#
+# Note that the requirement of being picklable adds extra runtime burden, as we
+# must create and maintain a `SyncManager` process that will create and manage
+# the real objects.
+#
+class PicklableQueueManager(QueueManager):
+ def __init__(self):
+ super().__init__()
+ self._manager = None
+
+ def make_queue_wrapper(self):
+ # Only SyncManager Queues are picklable, so we must make those. Avoid
+ # creating multiple expensive SyncManagers, by keeping this one around.
+ if self._manager is None:
+ self._manager = multiprocessing.Manager()
+ return _SyncManagerQueueWrapper(self._manager.Queue())
+
+
# QueueWrapper()
#
# This abstracts our choice of using picklable or non-picklable Queues.
@@ -63,3 +84,25 @@ class _PlainQueueWrapper(QueueWrapper):
def close(self):
self.queue.close()
+
+
+class _SyncManagerQueueWrapper(QueueWrapper):
+ def __init__(self, queue):
+ super().__init__()
+ self.queue = queue
+
+ def set_potential_callback_on_queue_event(self, event_loop, callback):
+ # We can't easily support these callbacks for Queues managed by a
+ # SyncManager, so don't support them for now. In later work we should
+ # be able to support them with threading.
+ pass
+
+ def clear_potential_callback_on_queue_event(self, event_loop):
+ pass
+
+ def close(self):
+ # SyncManager queue proxies do not have a `close()` method, they rely
+ # on a callback on garbage collection to release resources. For our
+ # purposes the queue is invalid after closing, so it's ok to release it
+ # here.
+ self.queue = None
diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py
index 18e26a9a1..faf3d3c52 100644
--- a/src/buildstream/_platform/platform.py
+++ b/src/buildstream/_platform/platform.py
@@ -18,6 +18,7 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
+import multiprocessing
import os
import platform
import sys
@@ -27,7 +28,7 @@ import psutil
from .._exceptions import PlatformError, ImplError, SandboxError
from .. import utils
-from .multiprocessing import QueueManager
+from .multiprocessing import QueueManager, PicklableQueueManager
class Platform():
@@ -175,7 +176,23 @@ class Platform():
return Platform.canonicalize_arch(uname_machine)
def make_queue_manager(self):
- return QueueManager()
+ if self.does_multiprocessing_start_require_pickling():
+ return PicklableQueueManager()
+ else:
+ return QueueManager()
+
+ # does_multiprocessing_start_require_pickling():
+ #
+ # Returns True if the multiprocessing start method will pickle arguments
+ # to new processes.
+ #
+ # Returns:
+ # (bool): Whether pickling is required or not
+ #
+ def does_multiprocessing_start_require_pickling(self):
+ # Note that if the start method has not been set before now, it will be
+ # set to the platform default by `get_start_method`.
+ return multiprocessing.get_start_method() != 'fork'
##################################################################
# Sandbox functions #
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index ff3cba46b..0d4db477f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -21,6 +21,7 @@
# System imports
import os
+import pickle
import sys
import signal
import datetime
@@ -86,6 +87,37 @@ class _MessageType(FastEnum):
SUBCLASS_CUSTOM_MESSAGE = 5
+# _do_pickled_child_job()
+#
+# Unpickle the supplied 'pickled' job and call 'child_action' on it.
+#
+# This is expected to be run in a subprocess started from the main process, as
+# such it will fixup any globals to be in the expected state.
+#
+# Args:
+# pickled (BytesIO): The pickled job to execute.
+# *child_args (any) : Any parameters to be passed to `child_action`.
+#
+def _do_pickled_child_job(pickled, *child_args):
+
+ utils._is_main_process = _not_main_process
+
+ child_job = pickle.load(pickled)
+ return child_job.child_action(*child_args)
+
+
+# _not_main_process()
+#
+# A function to replace `utils._is_main_process` when we're running in a
+# subprocess that was not forked - the inheritance of the main process id will
+# not work in this case.
+#
+# Note that we'll always not be the main process by definition.
+#
+def _not_main_process():
+ return False
+
+
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -178,12 +210,18 @@ class Job():
self._message_element_key
)
- # Make sure that picklability doesn't break, by exercising it during
- # our test suite.
- if self._scheduler.context.is_running_in_test_suite:
- pickle_child_job(child_job, self._scheduler.context.get_projects())
-
- self._process = Process(target=child_job.child_action, args=[self._queue])
+ if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
+ pickled = pickle_child_job(
+ child_job, self._scheduler.context.get_projects())
+ self._process = Process(
+ target=_do_pickled_child_job,
+ args=[pickled, self._queue_wrapper],
+ )
+ else:
+ self._process = Process(
+ target=child_job.child_action,
+ args=[self._queue_wrapper],
+ )
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main