diff options
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 36 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 6 |
2 files changed, 15 insertions, 27 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index e7866bcd4..61eff88a6 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -1,5 +1,6 @@ # # Copyright (C) 2018 Codethink Limited +# Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -131,7 +132,6 @@ class Job: self._scheduler = scheduler # The scheduler self._queue = None # A message passing queue self._process = None # The Process object - self._watcher = None # Child process watcher self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries @@ -185,32 +185,16 @@ class Job: # process will be notified of any signal after we launch the child. # with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - self._process.start() + with asyncio.get_child_watcher() as watcher: + self._process.start() + # Register the process to call `_parent_child_completed` once it is done - # Wait for the child task to complete. - # - # This is a tricky part of python which doesnt seem to - # make it to the online docs: - # - # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance - # which is the default type of watcher, and the instance belongs to the - # "event loop policy" in use (so there is only one in the main process). - # - # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio - # loop, and will selectively reap any child pids which have been - # terminated. - # - # o At registration time, the process will immediately be checked with - # `os.waitpid()` and will be reaped immediately, before add_child_handler() - # returns. - # - # The self._parent_child_completed callback passed here will normally - # be called after the child task has been reaped with `os.waitpid()`, in - # an event loop callback. Otherwise, if the job completes too fast, then - # the callback is called immediately. - # - self._watcher = asyncio.get_child_watcher() - self._watcher.add_child_handler(self._process.pid, self._parent_child_completed) + # Here we delay the call to the next loop tick. This is in order to be running + # in the main thread, as the callback itself must be thread safe. + def on_completion(pid, returncode): + asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode) + + watcher.add_child_handler(self._process.pid, on_completion) # terminate() # diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index d9a0984da..a45da82f9 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -188,7 +188,11 @@ class Scheduler: # Watch casd while running to ensure it doesn't die self._casd_process = casd_process_manager.process _watcher = asyncio.get_child_watcher() - _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure) + + def abort_casd(pid, returncode): + asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode) + + _watcher.add_child_handler(self._casd_process.pid, abort_casd) # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): |