summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/jobs/job.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_scheduler/jobs/job.py')
-rw-r--r--buildstream/_scheduler/jobs/job.py63
1 files changed, 17 insertions, 46 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index b8b4a2c76..adb520088 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -1,5 +1,6 @@
#
# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -32,6 +33,7 @@ import multiprocessing
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
+from .. import _multiprocessing
# Return code values shutdown of job handling child processes
#
@@ -64,15 +66,6 @@ class _Envelope():
self.message = message
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-class Process(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
-
# Job()
#
# The Job object represents a parallel task, when calling Job.spawn(),
@@ -127,39 +120,23 @@ class Job():
self._parent_start_listening()
# Spawn the process
- self._process = Process(target=self._child_action, args=[self._queue])
+ self._process = _multiprocessing.AsyncioSafeProcess(target=self._child_action, args=[self._queue])
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
# process will be notified of any signal after we launch the child.
#
with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- self._process.start()
+ with asyncio.get_child_watcher() as watcher:
+ self._process.start()
+ # Register the process to call `_parent_child_completed` once it is done
- # Wait for the child task to complete.
- #
- # This is a tricky part of python which doesnt seem to
- # make it to the online docs:
- #
- # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance
- # which is the default type of watcher, and the instance belongs to the
- # "event loop policy" in use (so there is only one in the main process).
- #
- # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio
- # loop, and will selectively reap any child pids which have been
- # terminated.
- #
- # o At registration time, the process will immediately be checked with
- # `os.waitpid()` and will be reaped immediately, before add_child_handler()
- # returns.
- #
- # The self._parent_child_completed callback passed here will normally
- # be called after the child task has been reaped with `os.waitpid()`, in
- # an event loop callback. Otherwise, if the job completes too fast, then
- # the callback is called immediately.
- #
- self._watcher = asyncio.get_child_watcher()
- self._watcher.add_child_handler(self._process.pid, self._parent_child_completed)
+ # Here we delay the call to the next loop tick. This is in order to be running
+ # in the main thread, as the callback itself must be thread safe.
+ def on_completion(pid, returncode):
+ asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+
+ watcher.add_child_handler(self._process.pid, on_completion)
# terminate()
#
@@ -182,21 +159,15 @@ class Job():
self._terminated = True
- # terminate_wait()
+ # get_terminated()
#
- # Wait for terminated jobs to complete
- #
- # Args:
- # timeout (float): Seconds to wait
+ # Check if a job has been terminated.
#
# Returns:
- # (bool): True if the process terminated cleanly, otherwise False
+ # (bool): True in the main process if Job.terminate() was called.
#
- def terminate_wait(self, timeout):
-
- # Join the child process after sending SIGTERM
- self._process.join(timeout)
- return self._process.exitcode is not None
+ def get_terminated(self):
+ return self._terminated
# kill()
#