summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2019-12-07 19:21:11 +0000
committerBenjamin Schubert <contact@benschubert.me>2019-12-07 19:21:11 +0000
commitf47bc0d2831359372ccab2c038c604257a7165ab (patch)
treea99549909bc83cdb2c8a6aa2a8ec8d177d751cf8
parent0e4cbd55020d835782bb42d6e543d489d6ea0924 (diff)
parentffa39d2cfd0744c7c1e0caf719cd900efd9e0533 (diff)
downloadbuildstream-f47bc0d2831359372ccab2c038c604257a7165ab.tar.gz
Merge branch 'bschubert/stricter-asyncio-handling' into 'master'
Fixes for asyncio loops were we don't follow the documentation See merge request BuildStream/buildstream!1756
-rw-r--r--src/buildstream/_scheduler/jobs/job.py36
-rw-r--r--src/buildstream/_scheduler/scheduler.py6
2 files changed, 15 insertions, 27 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index e7866bcd4..61eff88a6 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -1,5 +1,6 @@
#
# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -131,7 +132,6 @@ class Job:
self._scheduler = scheduler # The scheduler
self._queue = None # A message passing queue
self._process = None # The Process object
- self._watcher = None # Child process watcher
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
@@ -185,32 +185,16 @@ class Job:
# process will be notified of any signal after we launch the child.
#
with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- self._process.start()
+ with asyncio.get_child_watcher() as watcher:
+ self._process.start()
+ # Register the process to call `_parent_child_completed` once it is done
- # Wait for the child task to complete.
- #
- # This is a tricky part of python which doesnt seem to
- # make it to the online docs:
- #
- # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance
- # which is the default type of watcher, and the instance belongs to the
- # "event loop policy" in use (so there is only one in the main process).
- #
- # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio
- # loop, and will selectively reap any child pids which have been
- # terminated.
- #
- # o At registration time, the process will immediately be checked with
- # `os.waitpid()` and will be reaped immediately, before add_child_handler()
- # returns.
- #
- # The self._parent_child_completed callback passed here will normally
- # be called after the child task has been reaped with `os.waitpid()`, in
- # an event loop callback. Otherwise, if the job completes too fast, then
- # the callback is called immediately.
- #
- self._watcher = asyncio.get_child_watcher()
- self._watcher.add_child_handler(self._process.pid, self._parent_child_completed)
+ # Here we delay the call to the next loop tick. This is in order to be running
+ # in the main thread, as the callback itself must be thread safe.
+ def on_completion(pid, returncode):
+ asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+
+ watcher.add_child_handler(self._process.pid, on_completion)
# terminate()
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d9a0984da..a45da82f9 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -188,7 +188,11 @@ class Scheduler:
# Watch casd while running to ensure it doesn't die
self._casd_process = casd_process_manager.process
_watcher = asyncio.get_child_watcher()
- _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
+
+ def abort_casd(pid, returncode):
+ asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode)
+
+ _watcher.add_child_handler(self._casd_process.pid, abort_casd)
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):