diff options
Diffstat (limited to 'buildstream/_scheduler/jobs/job.py')
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 63 |
1 files changed, 17 insertions, 46 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index b8b4a2c76..adb520088 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -1,5 +1,6 @@ # # Copyright (C) 2018 Codethink Limited +# Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -32,6 +33,7 @@ import multiprocessing from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils +from .. import _multiprocessing # Return code values shutdown of job handling child processes # @@ -64,15 +66,6 @@ class _Envelope(): self.message = message -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -class Process(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - # Job() # # The Job object represents a parallel task, when calling Job.spawn(), @@ -127,39 +120,23 @@ class Job(): self._parent_start_listening() # Spawn the process - self._process = Process(target=self._child_action, args=[self._queue]) + self._process = _multiprocessing.AsyncioSafeProcess(target=self._child_action, args=[self._queue]) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main # process will be notified of any signal after we launch the child. # with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - self._process.start() + with asyncio.get_child_watcher() as watcher: + self._process.start() + # Register the process to call `_parent_child_completed` once it is done - # Wait for the child task to complete. - # - # This is a tricky part of python which doesnt seem to - # make it to the online docs: - # - # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance - # which is the default type of watcher, and the instance belongs to the - # "event loop policy" in use (so there is only one in the main process). - # - # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio - # loop, and will selectively reap any child pids which have been - # terminated. - # - # o At registration time, the process will immediately be checked with - # `os.waitpid()` and will be reaped immediately, before add_child_handler() - # returns. - # - # The self._parent_child_completed callback passed here will normally - # be called after the child task has been reaped with `os.waitpid()`, in - # an event loop callback. Otherwise, if the job completes too fast, then - # the callback is called immediately. - # - self._watcher = asyncio.get_child_watcher() - self._watcher.add_child_handler(self._process.pid, self._parent_child_completed) + # Here we delay the call to the next loop tick. This is in order to be running + # in the main thread, as the callback itself must be thread safe. + def on_completion(pid, returncode): + asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode) + + watcher.add_child_handler(self._process.pid, on_completion) # terminate() # @@ -182,21 +159,15 @@ class Job(): self._terminated = True - # terminate_wait() + # get_terminated() # - # Wait for terminated jobs to complete - # - # Args: - # timeout (float): Seconds to wait + # Check if a job has been terminated. # # Returns: - # (bool): True if the process terminated cleanly, otherwise False + # (bool): True in the main process if Job.terminate() was called. # - def terminate_wait(self, timeout): - - # Join the child process after sending SIGTERM - self._process.join(timeout) - return self._process.exitcode is not None + def get_terminated(self): + return self._terminated # kill() # |