diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-08-01 10:32:14 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-16 15:42:35 +0000 |
commit | 9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d (patch) | |
tree | e4f7718d507baa90b5fabae60908576006319bc9 /src/buildstream/_scheduler | |
parent | 36457ecd6783d0bf529af6f948ee4de5f600e21f (diff) | |
download | buildstream-9ab14764081bf88fdeb22e2ab7ab3eb345e1bf0d.tar.gz |
Support pickling jobs if the platform requires it
Add support for using `multiprocessing.Manager` and the associated
queues. Downgrade the queue event callback guarantees accordingly. In
later work we may be able to support callbacks in all scenarios.
Pickle and unpickle the child job if the platform requires it.
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 50 |
1 files changed, 44 insertions, 6 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 4104ddc73..2c4883756 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -21,6 +21,7 @@ # System imports import os +import pickle import sys import signal import datetime @@ -86,6 +87,37 @@ class _MessageType(FastEnum): SUBCLASS_CUSTOM_MESSAGE = 5 +# _do_pickled_child_job() +# +# Unpickle the supplied 'pickled' job and call 'child_action' on it. +# +# This is expected to be run in a subprocess started from the main process, as +# such it will fixup any globals to be in the expected state. +# +# Args: +# pickled (BytesIO): The pickled job to execute. +# *child_args (any) : Any parameters to be passed to `child_action`. +# +def _do_pickled_child_job(pickled, *child_args): + + utils._is_main_process = _not_main_process + + child_job = pickle.load(pickled) + return child_job.child_action(*child_args) + + +# _not_main_process() +# +# A function to replace `utils._is_main_process` when we're running in a +# subprocess that was not forked - the inheritance of the main process id will +# not work in this case. +# +# Note that we'll always not be the main process by definition. +# +def _not_main_process(): + return False + + # Job() # # The Job object represents a task that will run in parallel to the main @@ -178,12 +210,18 @@ class Job(): self._message_element_key ) - # Make sure that picklability doesn't break, by exercising it during - # our test suite. - if self._scheduler.context.is_running_in_test_suite: - pickle_child_job(child_job, self._scheduler.context.get_projects()) - - self._process = Process(target=child_job.child_action, args=[self._queue]) + if self._scheduler.context.platform.does_multiprocessing_start_require_pickling(): + pickled = pickle_child_job( + child_job, self._scheduler.context.get_projects()) + self._process = Process( + target=_do_pickled_child_job, + args=[pickled, self._queue_wrapper], + ) + else: + self._process = Process( + target=child_job.child_action, + args=[self._queue_wrapper], + ) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main |