summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/_asyncio.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/_asyncio.py')
-rw-r--r--src/buildstream/_scheduler/_asyncio.py73
1 files changed, 73 insertions, 0 deletions
diff --git a/src/buildstream/_scheduler/_asyncio.py b/src/buildstream/_scheduler/_asyncio.py
new file mode 100644
index 000000000..b9da163ab
--- /dev/null
+++ b/src/buildstream/_scheduler/_asyncio.py
@@ -0,0 +1,73 @@
+#
+# Copyright (C) 2019 Bloomberg LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import multiprocessing
+from asyncio import *
+
+
+class _ForkableUnixSelectorEventLoop(SelectorEventLoop):
+
+ def _cleanup_on_fork(self):
+ parent_selector = self._selector
+ self._selector = type(parent_selector)()
+
+ for key in parent_selector.get_map().values():
+ self._selector.register(key.fileobj, key.events, key.data)
+ parent_selector.close()
+ self._close_self_pipe()
+ self._make_self_pipe()
+
+
+class _DefaultEventLoopPolicy(DefaultEventLoopPolicy):
+ _loop_factory = _ForkableUnixSelectorEventLoop
+
+ def cleanup_on_fork(self):
+ if self._local._loop is None:
+ # We don't have a loop, we're fine
+ return
+
+ assert not Task.all_tasks(self._local._loop), "Some asyncio tasks leaked to a child"
+ assert Task.current_task(self._local._loop) is None, "We are in the child, we should have no running task"
+ self._local._loop._cleanup_on_fork()
+ self._local._loop.stop()
+ del self._local._loop
+
+
+class _AsyncioSafeProcess(multiprocessing.Process):
+ # pylint: disable=attribute-defined-outside-init
+ def start(self):
+ self._popen = self._Popen(self)
+ self._sentinel = self._popen.sentinel
+
+
+# Process class that doesn't call waitpid on its own.
+# This prevents conflicts with the asyncio child watcher.
+class _AsyncioForkAwareProcess(_AsyncioSafeProcess):
+ def run(self):
+ get_event_loop_policy().cleanup_on_fork()
+ super().run()
+
+
+if multiprocessing.get_start_method(False) == "fork":
+ # Set the default event loop policy to automatically close our asyncio loop in child processes
+ DefaultEventLoopPolicy = _DefaultEventLoopPolicy
+ set_event_loop_policy(DefaultEventLoopPolicy())
+
+ AsyncioSafeProcess = _AsyncioForkAwareProcess
+
+else:
+ AsyncioSafeProcess = _AsyncioSafeProcess