summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-06-05 16:55:11 +0100
committerAngelos Evripiotis <jevripiotis@bloomberg.net>2019-06-12 16:18:43 +0100
commit2934bbc54bd58093d33f04eb8573671f17c33742 (patch)
tree1ada92f44f079f414f215e573a348f9b0d1bf8a1
parent183f1aa6549444f0c9e328461de00b3879fed877 (diff)
downloadbuildstream-2934bbc54bd58093d33f04eb8573671f17c33742.tar.gz
WIP: spawn instead of fork
-rw-r--r--src/buildstream/_context.py12
-rw-r--r--src/buildstream/_scheduler/jobs/job.py49
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py5
-rw-r--r--src/buildstream/_scheduler/scheduler.py4
4 files changed, 33 insertions, 37 deletions
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index d9ef1f173..2e1d0ee89 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -584,7 +584,9 @@ class Context():
# we also do not allow it in the main process.
assert self._log_handle is None
assert self._log_filename is None
- assert not utils._is_main_process()
+
+ # Need to deal with global _main_pid var.
+ # assert not utils._is_main_process()
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
@@ -679,10 +681,10 @@ class Context():
# If this message is associated with a plugin, print what
# we know about the plugin.
plugin_name = ""
- if message.unique_id:
- template += " {plugin}"
- plugin = Plugin._lookup(message.unique_id)
- plugin_name = plugin.name
+ # if message.unique_id:
+ # template += " {plugin}"
+ # plugin = Plugin._lookup(message.unique_id)
+ # plugin_name = plugin.name
template += ": {message}"
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 307ca7f25..7bca17d14 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -230,7 +230,7 @@ class Job():
#
def spawn(self):
- self._queue = multiprocessing.Queue()
+ self._queue = self._scheduler.manager.Queue()
self._tries += 1
self._parent_start_listening()
@@ -246,32 +246,18 @@ class Job():
)
# Spawn the process
- self._process = Process(target=child_job.child_action, args=[self._queue])
-
- import contextlib
- import time
- @contextlib.contextmanager
- def timer(message):
- then = time.time()
- yield
- now = time.time()
- print(f"({now - then:,.2}s):", message)
-
- import buildstream.testpickle
- with timer(f"Pickle {self._child_action}"):
- pickled_process = buildstream.testpickle.test_pickle_direct(self._child_action)
- print(f"Size of pickled data: {len(pickled_process.getbuffer()):,}")
- import pickle
- pickled_process.seek(0)
- # unpickled_process = pickle.load(pickled_process)
+ pickled = _pickle_child_job(child_job, self._scheduler.context)
+ self._process = Process(
+ target=_do_pickled_child_job,
+ args=[pickled, self._queue],
+ )
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
# process will be notified of any signal after we launch the child.
#
- with timer(f"process.start {self}"):
- with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- self._process.start()
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+ self._process.start()
# Wait for the child task to complete.
#
@@ -617,19 +603,22 @@ class Job():
#
# http://bugs.python.org/issue3831
#
- if not self._listening:
- self._scheduler.loop.add_reader(
- self._queue._reader.fileno(), self._parent_recv)
- self._listening = True
+
+ # if not self._listening:
+ # self._scheduler.loop.add_reader(
+ # self._queue._reader.fileno(), self._parent_recv)
+ # self._listening = True
+ pass
# _parent_stop_listening()
#
# Stops listening on the message queue
#
def _parent_stop_listening(self):
- if self._listening:
- self._scheduler.loop.remove_reader(self._queue._reader.fileno())
- self._listening = False
+ # if self._listening:
+ # self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ # self._listening = False
+ pass
# ChildJob()
@@ -898,7 +887,7 @@ class ChildJob():
# exit_code (int): The exit code to exit with
#
def _child_shutdown(self, exit_code):
- self._queue.close()
+ # self._queue.close()
sys.exit(exit_code)
# _child_message_handler()
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index aa489f381..caa4bc27e 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -113,5 +113,6 @@ class BuildQueue(Queue):
# artifact cache size for a successful build even though we know a
# failed build also grows the artifact cache size.
#
- if status == JobStatus.OK:
- self._check_cache_size(job, element, result)
+
+ # if status == JobStatus.OK:
+ # self._check_cache_size(job, element, result)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 50ad7f07a..12acdd515 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -75,6 +75,10 @@ class Scheduler():
job_start_callback=None,
job_complete_callback=None):
+ import multiprocessing
+ multiprocessing.set_start_method('spawn')
+ self.manager = multiprocessing.Manager()
+
#
# Public members
#