diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-06-05 16:55:11 +0100 |
---|---|---|
committer | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-06-12 16:18:43 +0100 |
commit | 2934bbc54bd58093d33f04eb8573671f17c33742 (patch) | |
tree | 1ada92f44f079f414f215e573a348f9b0d1bf8a1 | |
parent | 183f1aa6549444f0c9e328461de00b3879fed877 (diff) | |
download | buildstream-2934bbc54bd58093d33f04eb8573671f17c33742.tar.gz |
WIP: spawn instead of fork
-rw-r--r-- | src/buildstream/_context.py | 12 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 49 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 5 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 4 |
4 files changed, 33 insertions, 37 deletions
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index d9ef1f173..2e1d0ee89 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -584,7 +584,9 @@ class Context(): # we also do not allow it in the main process. assert self._log_handle is None assert self._log_filename is None - assert not utils._is_main_process() + + # Need to deal with global _main_pid var. + # assert not utils._is_main_process() # Create the fully qualified logfile in the log directory, # appending the pid and .log extension at the end. @@ -679,10 +681,10 @@ class Context(): # If this message is associated with a plugin, print what # we know about the plugin. plugin_name = "" - if message.unique_id: - template += " {plugin}" - plugin = Plugin._lookup(message.unique_id) - plugin_name = plugin.name + # if message.unique_id: + # template += " {plugin}" + # plugin = Plugin._lookup(message.unique_id) + # plugin_name = plugin.name template += ": {message}" diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 307ca7f25..7bca17d14 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -230,7 +230,7 @@ class Job(): # def spawn(self): - self._queue = multiprocessing.Queue() + self._queue = self._scheduler.manager.Queue() self._tries += 1 self._parent_start_listening() @@ -246,32 +246,18 @@ class Job(): ) # Spawn the process - self._process = Process(target=child_job.child_action, args=[self._queue]) - - import contextlib - import time - @contextlib.contextmanager - def timer(message): - then = time.time() - yield - now = time.time() - print(f"({now - then:,.2}s):", message) - - import buildstream.testpickle - with timer(f"Pickle {self._child_action}"): - pickled_process = buildstream.testpickle.test_pickle_direct(self._child_action) - print(f"Size of pickled data: {len(pickled_process.getbuffer()):,}") - import pickle - pickled_process.seek(0) - # unpickled_process = pickle.load(pickled_process) + pickled = _pickle_child_job(child_job, self._scheduler.context) + self._process = Process( + target=_do_pickled_child_job, + args=[pickled, self._queue], + ) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main # process will be notified of any signal after we launch the child. # - with timer(f"process.start {self}"): - with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - self._process.start() + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): + self._process.start() # Wait for the child task to complete. # @@ -617,19 +603,22 @@ class Job(): # # http://bugs.python.org/issue3831 # - if not self._listening: - self._scheduler.loop.add_reader( - self._queue._reader.fileno(), self._parent_recv) - self._listening = True + + # if not self._listening: + # self._scheduler.loop.add_reader( + # self._queue._reader.fileno(), self._parent_recv) + # self._listening = True + pass # _parent_stop_listening() # # Stops listening on the message queue # def _parent_stop_listening(self): - if self._listening: - self._scheduler.loop.remove_reader(self._queue._reader.fileno()) - self._listening = False + # if self._listening: + # self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + # self._listening = False + pass # ChildJob() @@ -898,7 +887,7 @@ class ChildJob(): # exit_code (int): The exit code to exit with # def _child_shutdown(self, exit_code): - self._queue.close() + # self._queue.close() sys.exit(exit_code) # _child_message_handler() diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index aa489f381..caa4bc27e 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -113,5 +113,6 @@ class BuildQueue(Queue): # artifact cache size for a successful build even though we know a # failed build also grows the artifact cache size. # - if status == JobStatus.OK: - self._check_cache_size(job, element, result) + + # if status == JobStatus.OK: + # self._check_cache_size(job, element, result) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 50ad7f07a..12acdd515 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -75,6 +75,10 @@ class Scheduler(): job_start_callback=None, job_complete_callback=None): + import multiprocessing + multiprocessing.set_start_method('spawn') + self.manager = multiprocessing.Manager() + # # Public members # |