summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-11-20 08:17:02 +0100
committerJürg Billeter <j@bitron.ch>2019-11-20 16:42:41 +0100
commit0b4ec4b4b6bdec4ca5051518c3af6a87a43f467d (patch)
tree293a9180396c4829d854797770d409cc8220c301
parentda165c5bd7d3965b10e07495fb0f4af419706792 (diff)
downloadbuildstream-juerg/wsl.tar.gz
WIP: job.py: Queue -> Pipejuerg/wsl
-rw-r--r--src/buildstream/_scheduler/jobs/job.py23
1 files changed, 15 insertions, 8 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 8e909977f..ef5a895ab 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -155,7 +155,7 @@ class Job:
#
def start(self):
- self._queue = multiprocessing.Queue()
+ self._queue, queue_writer = multiprocessing.Pipe(duplex=False)
self._tries += 1
self._parent_start_listening()
@@ -174,10 +174,10 @@ class Job:
if self._scheduler.context.platform.does_multiprocessing_start_require_pickling():
pickled = pickle_child_job(child_job, self._scheduler.context.get_projects(),)
self._process = _multiprocessing.AsyncioSafeProcess(
- target=do_pickled_child_job, args=[pickled, self._queue],
+ target=do_pickled_child_job, args=[pickled, queue_writer],
)
else:
- self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[self._queue],)
+ self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[queue_writer],)
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -186,6 +186,9 @@ class Job:
with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
self._process.start()
+ queue_writer.close()
+ queue_writer = None
+
# Wait for the child task to complete.
#
# This is a tricky part of python which doesnt seem to
@@ -508,8 +511,12 @@ class Job:
# in the parent process.
#
def _parent_process_queue(self):
- while not self._queue.empty():
- envelope = self._queue.get_nowait()
+ while self._queue.poll():
+ try:
+ envelope = self._queue.recv()
+ except EOFError:
+ self._parent_stop_listening()
+ break
self._parent_process_envelope(envelope)
# _parent_recv()
@@ -537,7 +544,7 @@ class Job:
# http://bugs.python.org/issue3831
#
if not self._listening:
- self._scheduler.loop.add_reader(self._queue._reader.fileno(), self._parent_recv)
+ self._scheduler.loop.add_reader(self._queue.fileno(), self._parent_recv)
self._listening = True
# _parent_stop_listening()
@@ -546,7 +553,7 @@ class Job:
#
def _parent_stop_listening(self):
if self._listening:
- self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ self._scheduler.loop.remove_reader(self._queue.fileno())
self._listening = False
@@ -797,7 +804,7 @@ class ChildJob:
# instances). This is sent to the parent Job.
#
def _send_message(self, message_type, message_data):
- self._queue.put(_Envelope(message_type, message_data))
+ self._queue.send(_Envelope(message_type, message_data))
# _child_send_error()
#