summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2019-12-06 11:43:22 +0000
committerBenjamin Schubert <contact@benschubert.me>2019-12-07 18:19:57 +0000
commitffa39d2cfd0744c7c1e0caf719cd900efd9e0533 (patch)
treea99549909bc83cdb2c8a6aa2a8ec8d177d751cf8
parent1c081410de2250690d6a1cd05efd0aa20343e5fd (diff)
downloadbuildstream-bschubert/stricter-asyncio-handling.tar.gz
scheduler.py: Only run thread-safe code in callbacks from watchersbschubert/stricter-asyncio-handling
Per https://docs.python.org/3/library/asyncio-policy.html#asyncio.AbstractChildWatcher.add_child_handler, the callback from a child handler must be thread safe. Not all our callbacks were. This changes all our callbacks to schedule a call for the next loop iteration instead of executing it directly.
-rw-r--r--src/buildstream/_scheduler/jobs/job.py8
-rw-r--r--src/buildstream/_scheduler/scheduler.py6
2 files changed, 12 insertions, 2 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 71c40f397..61eff88a6 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -188,7 +188,13 @@ class Job:
with asyncio.get_child_watcher() as watcher:
self._process.start()
# Register the process to call `_parent_child_completed` once it is done
- watcher.add_child_handler(self._process.pid, self._parent_child_completed)
+
+ # Here we delay the call to the next loop tick. This is in order to be running
+ # in the main thread, as the callback itself must be thread safe.
+ def on_completion(pid, returncode):
+ asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+
+ watcher.add_child_handler(self._process.pid, on_completion)
# terminate()
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d9a0984da..a45da82f9 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -188,7 +188,11 @@ class Scheduler:
# Watch casd while running to ensure it doesn't die
self._casd_process = casd_process_manager.process
_watcher = asyncio.get_child_watcher()
- _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
+
+ def abort_casd(pid, returncode):
+ asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode)
+
+ _watcher.add_child_handler(self._casd_process.pid, abort_casd)
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):