diff options
author | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-07-03 12:57:06 +0000 |
---|---|---|
committer | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-12-04 10:36:37 +0000 |
commit | 705d0023f65621b23b6b0828306dc5b4ee094b45 (patch) | |
tree | c021c06427c7e37508c682bc99e06d9eae090975 /tests | |
parent | be88eaec0445ff2d85b73c17a392d0e65620202b (diff) | |
download | buildstream-705d0023f65621b23b6b0828306dc5b4ee094b45.tar.gz |
scheduler.py: Use threads instead of processes for jobs
This changes how the scheduler works and adapts all the code that needs
adapting in order to be able to run in threads instead of in
subprocesses, which helps with Windows support, and will allow some
simplifications in the main pipeline.
This addresses the following issues:
* Fix #810: All CAS calls are now made in the master process, and thus
share the same connection to the cas server
* Fix #93: We don't start as many child processes anymore, so the risk
of starving the machine are way less
* Fix #911: We now use `forkserver` for starting processes. We also
don't use subprocesses for jobs so we should be starting less
subprocesses
And the following highlevel changes where made:
* cascache.py: Run the CasCacheUsageMonitor in a thread instead of a
subprocess.
* casdprocessmanager.py: Ensure start and stop of the process are thread
safe.
* job.py: Run the child in a thread instead of a process, adapt how we
stop a thread, since we ca't use signals anymore.
* _multiprocessing.py: Not needed anymore, we are not using `fork()`.
* scheduler.py: Run the scheduler with a threadpool, to run the child
jobs in. Also adapt how our signal handling is done, since we are not
receiving signals from our children anymore, and can't kill them the
same way.
* sandbox: Stop using blocking signals to wait on the process, and use
timeouts all the time.
* messenger.py: Use a thread-local context for the handler, to allow for
multiple parameters in the same process.
* _remote.py: Ensure the start of the connection is thread safe
* _signal.py: Allow blocking entering in the signal's context managers
by setting an event. This is to ensure no thread runs long-running
code while we asked the scheduler to pause. This also ensures all the
signal handlers is thread safe.
* source.py: Change check around saving the source's ref. We are now
running in the same process, and thus the ref will already have been
changed.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/artifactcache/pull.py | 1 | ||||
-rw-r--r-- | tests/internals/cascache.py | 9 |
2 files changed, 10 insertions, 0 deletions
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index e6eaec960..63e6d9814 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -162,6 +162,7 @@ def test_pull_tree(cli, tmpdir, datafiles): # Assert that we are not cached locally anymore artifactcache.close_grpc_channels() + cas._casd_channel.request_shutdown() cas.close_grpc_channels() assert cli.get_element_state(project_dir, "target.bst") != "cached" diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py index 043531c24..e27e40974 100644 --- a/tests/internals/cascache.py +++ b/tests/internals/cascache.py @@ -3,6 +3,7 @@ import time from unittest.mock import MagicMock from buildstream._cas.cascache import CASCache +from buildstream._cas import casdprocessmanager from buildstream._message import MessageType from buildstream._messenger import Messenger @@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) @@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) |