From 3b91d1b02cb62730c640b3688f2cfc8486f008d7 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Fri, 6 Dec 2019 11:01:24 +0000 Subject: job.py: Remove '_watcher' attribute, it is not needed We don't need to keep a reference to the watcher, let's remove it. --- src/buildstream/_scheduler/jobs/job.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index e7866bcd4..c5c14ff0f 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -131,7 +131,6 @@ class Job: self._scheduler = scheduler # The scheduler self._queue = None # A message passing queue self._process = None # The Process object - self._watcher = None # Child process watcher self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries @@ -209,8 +208,8 @@ class Job: # an event loop callback. Otherwise, if the job completes too fast, then # the callback is called immediately. # - self._watcher = asyncio.get_child_watcher() - self._watcher.add_child_handler(self._process.pid, self._parent_child_completed) + watcher = asyncio.get_child_watcher() + watcher.add_child_handler(self._process.pid, self._parent_child_completed) # terminate() # -- cgit v1.2.1 From 1c081410de2250690d6a1cd05efd0aa20343e5fd Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Fri, 6 Dec 2019 11:17:02 +0000 Subject: job.py: Only start new jobs in a `with watcher:` block The documentation (https://docs.python.org/3/library/asyncio-policy.html#asyncio.AbstractChildWatcher) is apparently missing this part, but the code mentions that new processes should only ever be called inside a with block: https://github.com/python/cpython/blob/99eb70a9eb9493602ff6ad8bb92df4318cf05a3e/Lib/asyncio/unix_events.py#L808 --- src/buildstream/_scheduler/jobs/job.py | 31 +++++-------------------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index c5c14ff0f..71c40f397 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -1,5 +1,6 @@ # # Copyright (C) 2018 Codethink Limited +# Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -184,32 +185,10 @@ class Job: # process will be notified of any signal after we launch the child. # with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - self._process.start() - - # Wait for the child task to complete. - # - # This is a tricky part of python which doesnt seem to - # make it to the online docs: - # - # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance - # which is the default type of watcher, and the instance belongs to the - # "event loop policy" in use (so there is only one in the main process). - # - # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio - # loop, and will selectively reap any child pids which have been - # terminated. - # - # o At registration time, the process will immediately be checked with - # `os.waitpid()` and will be reaped immediately, before add_child_handler() - # returns. - # - # The self._parent_child_completed callback passed here will normally - # be called after the child task has been reaped with `os.waitpid()`, in - # an event loop callback. Otherwise, if the job completes too fast, then - # the callback is called immediately. - # - watcher = asyncio.get_child_watcher() - watcher.add_child_handler(self._process.pid, self._parent_child_completed) + with asyncio.get_child_watcher() as watcher: + self._process.start() + # Register the process to call `_parent_child_completed` once it is done + watcher.add_child_handler(self._process.pid, self._parent_child_completed) # terminate() # -- cgit v1.2.1 From ffa39d2cfd0744c7c1e0caf719cd900efd9e0533 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Fri, 6 Dec 2019 11:43:22 +0000 Subject: scheduler.py: Only run thread-safe code in callbacks from watchers Per https://docs.python.org/3/library/asyncio-policy.html#asyncio.AbstractChildWatcher.add_child_handler, the callback from a child handler must be thread safe. Not all our callbacks were. This changes all our callbacks to schedule a call for the next loop iteration instead of executing it directly. --- src/buildstream/_scheduler/jobs/job.py | 8 +++++++- src/buildstream/_scheduler/scheduler.py | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 71c40f397..61eff88a6 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -188,7 +188,13 @@ class Job: with asyncio.get_child_watcher() as watcher: self._process.start() # Register the process to call `_parent_child_completed` once it is done - watcher.add_child_handler(self._process.pid, self._parent_child_completed) + + # Here we delay the call to the next loop tick. This is in order to be running + # in the main thread, as the callback itself must be thread safe. + def on_completion(pid, returncode): + asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode) + + watcher.add_child_handler(self._process.pid, on_completion) # terminate() # diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index d9a0984da..a45da82f9 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -188,7 +188,11 @@ class Scheduler: # Watch casd while running to ensure it doesn't die self._casd_process = casd_process_manager.process _watcher = asyncio.get_child_watcher() - _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure) + + def abort_casd(pid, returncode): + asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode) + + _watcher.add_child_handler(self._casd_process.pid, abort_casd) # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): -- cgit v1.2.1