diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-03 10:35:00 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-03 10:35:00 +0000 |
commit | e485a0643ff5ec96750f778eb0d373efa2fe69c5 (patch) | |
tree | b472768f0bc8fa9a678ea77c1f74dfe3ff3960ea | |
parent | 7dbcf42dfb0f7d612304c491c844355c58682e93 (diff) | |
parent | 3221f3511cbfe6a748b542627801acf8d0fcb379 (diff) | |
download | buildstream-e485a0643ff5ec96750f778eb0d373efa2fe69c5.tar.gz |
Merge branch 'juerg/fork' into 'master'
Replace safeguard for fork with multiple threads
See merge request BuildStream/buildstream!1577
-rw-r--r-- | requirements/requirements.txt | 2 | ||||
-rw-r--r-- | src/buildstream/_basecache.py | 22 | ||||
-rw-r--r-- | src/buildstream/_cas/cascache.py | 19 | ||||
-rw-r--r-- | src/buildstream/_cas/casremote.py | 12 | ||||
-rw-r--r-- | src/buildstream/_cas/casserver.py | 4 | ||||
-rw-r--r-- | src/buildstream/_context.py | 29 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 2 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 5 | ||||
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 146 | ||||
-rw-r--r-- | src/buildstream/testing/_fixtures.py | 31 | ||||
-rw-r--r-- | src/buildstream/testing/_forked.py | 6 | ||||
-rw-r--r-- | src/buildstream/testing/_sourcetests/conftest.py | 17 | ||||
-rw-r--r-- | src/buildstream/utils.py | 20 | ||||
-rw-r--r-- | tests/artifactcache/artifactservice.py | 100 | ||||
-rwxr-xr-x | tests/conftest.py | 1 | ||||
-rw-r--r-- | tests/testutils/http_server.py | 2 |
16 files changed, 268 insertions, 150 deletions
diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 0f1f5599f..370316a68 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,5 +1,5 @@ Click==7.0 -grpcio==1.17.1 +grpcio==1.23.0 Jinja2==2.10 pluginbase==0.7 protobuf==3.6.1 diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 46de29f7b..9ad6c1277 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -59,6 +59,28 @@ class BaseCache(): self._has_fetch_remotes = False self._has_push_remotes = False + # has_open_grpc_channels(): + # + # Return whether there are gRPC channel instances. This is used to safeguard + # against fork() with open gRPC channels. + # + def has_open_grpc_channels(self): + for project_remotes in self._remotes.values(): + for remote in project_remotes: + if remote.channel: + return True + return False + + # release_resources(): + # + # Release resources used by BaseCache. + # + def release_resources(self): + # Close all remotes and their gRPC channels + for project_remotes in self._remotes.values(): + for remote in project_remotes: + remote.close() + # specs_from_config_node() # # Parses the configuration of remote artifact caches from a config block. diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 03d7e5dda..21c9f0ad6 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -86,7 +86,6 @@ class CASCache(): self._casd_channel = None self._local_cas = None - self._fork_disabled = False def __getstate__(self): state = self.__dict__.copy() @@ -102,9 +101,6 @@ class CASCache(): assert self._casd_process, "CASCache was instantiated without buildbox-casd" if not self._local_cas: - # gRPC doesn't support fork without exec, which is used in the main process. - assert self._fork_disabled or not utils._is_main_process() - self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path) self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) @@ -136,13 +132,13 @@ class CASCache(): if not (os.path.isdir(headdir) and os.path.isdir(objdir)): raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir)) - # notify_fork_disabled(): + # has_open_grpc_channels(): # - # Called by Context when fork() is disabled. This will enable communication - # with casd via gRPC in the main process. + # Return whether there are gRPC channel instances. This is used to safeguard + # against fork() with open gRPC channels. # - def notify_fork_disabled(self): - self._fork_disabled = True + def has_open_grpc_channels(self): + return bool(self._casd_channel) # release_resources(): # @@ -150,6 +146,11 @@ class CASCache(): # def release_resources(self, messenger=None): if self._casd_process: + if self._casd_channel: + self._local_cas = None + self._casd_channel.close() + self._casd_channel = None + self._casd_process.terminate() try: # Don't print anything if buildbox-casd terminates quickly diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 9cc1a5488..05b57d3d4 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -183,6 +183,18 @@ class CASRemote(): self._initialized = True + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False + + def close(self): + if self.channel: + self.channel.close() + self.channel = None + # check_remote # # Used when checking whether remote_specs work in the buildstream main diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 1835d118e..5a4c2b7ac 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -60,10 +60,6 @@ def create_server(repo, *, enable_push, quota): cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) try: - # Allow gRPC communication in main process as bst-artifact-server - # doesn't use forked subprocesses. - cas.notify_fork_disabled() - artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') sourcedir = os.path.join(os.path.abspath(repo), 'source_protos') diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 0d250eb56..ecdf6f97b 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -150,8 +150,6 @@ class Context(): # Whether file contents are required for all artifacts in the local cache self.require_artifact_files = True - self.fork_allowed = True - # Whether elements must be rebuilt when their dependencies have changed self._strict_build_plan = None @@ -182,6 +180,12 @@ class Context(): # Called when exiting the with-statement context. # def __exit__(self, exc_type, exc_value, traceback): + if self._artifactcache: + self._artifactcache.release_resources() + + if self._sourcecache: + self._sourcecache.release_resources() + if self._cascache: self._cascache.release_resources(self.messenger) @@ -500,12 +504,19 @@ class Context(): self._cascache = CASCache(self.cachedir, cache_quota=self.config_cache_quota) return self._cascache - # disable_fork(): + # is_fork_allowed(): # - # This will prevent the scheduler from running but will allow communication - # with casd in the main process. + # Return whether fork without exec is allowed. This is a safeguard against + # fork issues with multiple threads and gRPC connections. # - def disable_fork(self): - self.fork_allowed = False - cascache = self.get_cascache() - cascache.notify_fork_disabled() + def is_fork_allowed(self): + # Do not allow fork if there are background threads. + if not utils._is_single_threaded(): + return False + + # Do not allow fork if there are open gRPC channels. + for cache in [self._cascache, self._artifactcache, self._sourcecache]: + if cache and cache.has_open_grpc_channels(): + return False + + return True diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index b29bc8841..37295b285 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -110,7 +110,7 @@ class Scheduler(): # def run(self, queues): - assert self.context.fork_allowed + assert self.context.is_fork_allowed() # Hold on to the queues to process self.queues = queues diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 9ff93fdc3..f0e891dcf 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -223,8 +223,6 @@ class Stream(): else: buildtree = True - self._context.disable_fork() - return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree) @@ -551,8 +549,6 @@ class Stream(): self._enqueue_plan(uncached_elts) self._run() - self._context.disable_fork() - # Stage deps into a temporary sandbox first if isinstance(target, ArtifactElement): try: @@ -623,7 +619,6 @@ class Stream(): load_refs=True) if self._artifacts.has_fetch_remotes(): - self._context.disable_fork() self._pipeline.check_remotes(target_objects) # XXX: We need to set the name of an ArtifactElement to its ref in order diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 4308d662b..e9e145d1f 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -281,10 +281,11 @@ class SandboxRemote(Sandbox): context = self._get_context() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec, cascache) - # Now do a pull to ensure we have the full directory structure. - dir_digest = cascache.pull_tree(casremote, tree_digest) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + # Now do a pull to ensure we have the full directory structure. + dir_digest = cascache.pull_tree(casremote, tree_digest) + if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes: raise SandboxError("Output directory structure pulling from remote failed.") @@ -300,7 +301,6 @@ class SandboxRemote(Sandbox): project = self._get_project() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec, cascache) # Fetch the file blobs if needed if self._output_files_required or artifactcache.has_push_remotes(): @@ -319,7 +319,9 @@ class SandboxRemote(Sandbox): # artifact servers. blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs) - remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + if remote_missing_blobs: raise SandboxError("{} output files are missing on the CAS server" .format(len(remote_missing_blobs))) @@ -351,65 +353,66 @@ class SandboxRemote(Sandbox): input_root_digest=input_root_digest) action_digest = utils._message_digest(action.SerializeToString()) - # Next, try to create a communication channel to the BuildGrid server. - url = urlparse(self.exec_url) - if not url.port: - raise SandboxError("You must supply a protocol and port number in the execution-service url, " - "for example: http://buildservice:50051.") - if url.scheme == 'http': - channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) - elif url.scheme == 'https': - channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) - else: - raise SandboxError("Remote execution currently only supports the 'http' protocol " - "and '{}' was supplied.".format(url.scheme)) - # check action cache download and download if there action_result = self._check_action_cache(action_digest) if not action_result: - casremote = CASRemote(self.storage_remote_spec, cascache) - try: - casremote.init() - except grpc.RpcError as e: - raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" - .format(self.storage_url, e)) from e - - # Determine blobs missing on remote - try: - missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) - except grpc.RpcError as e: - raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - - # Check if any blobs are also missing locally (partial artifact) - # and pull them from the artifact cache. - try: - local_missing_blobs = cascache.local_missing_blobs(missing_blobs) - if local_missing_blobs: - artifactcache.fetch_missing_blobs(project, local_missing_blobs) - except (grpc.RpcError, BstError) as e: - raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e - - # Now, push the missing blobs to the remote. - try: - cascache.send_blobs(casremote, missing_blobs) - except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e - - # Push command and action - try: - casremote.push_message(command_proto) - except grpc.RpcError as e: - raise SandboxError("Failed to push command to remote: {}".format(e)) - - try: - casremote.push_message(action) - except grpc.RpcError as e: - raise SandboxError("Failed to push action to remote: {}".format(e)) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + try: + casremote.init() + except grpc.RpcError as e: + raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" + .format(self.storage_url, e)) from e + + # Determine blobs missing on remote + try: + missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) + except grpc.RpcError as e: + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e + + # Check if any blobs are also missing locally (partial artifact) + # and pull them from the artifact cache. + try: + local_missing_blobs = cascache.local_missing_blobs(missing_blobs) + if local_missing_blobs: + artifactcache.fetch_missing_blobs(project, local_missing_blobs) + except (grpc.RpcError, BstError) as e: + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e + + # Now, push the missing blobs to the remote. + try: + cascache.send_blobs(casremote, missing_blobs) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + + # Push command and action + try: + casremote.push_message(command_proto) + except grpc.RpcError as e: + raise SandboxError("Failed to push command to remote: {}".format(e)) + + try: + casremote.push_message(action) + except grpc.RpcError as e: + raise SandboxError("Failed to push action to remote: {}".format(e)) + + # Next, try to create a communication channel to the BuildGrid server. + url = urlparse(self.exec_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the execution-service url, " + "for example: http://buildservice:50051.") + if url.scheme == 'http': + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + elif url.scheme == 'https': + channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) + else: + raise SandboxError("Remote execution currently only supports the 'http' protocol " + "and '{}' was supplied.".format(url.scheme)) # Now request to execute the action - operation = self.run_remote_command(channel, action_digest) - action_result = self._extract_action_result(operation) + with channel: + operation = self.run_remote_command(channel, action_digest) + action_result = self._extract_action_result(operation) # Get output of build self.process_job_output(action_result.output_directories, action_result.output_files, @@ -445,20 +448,21 @@ class SandboxRemote(Sandbox): elif url.scheme == 'https': channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials) - request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, - action_digest=action_digest) - stub = remote_execution_pb2_grpc.ActionCacheStub(channel) - try: - result = stub.GetActionResult(request) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise SandboxError("Failed to query action cache: {} ({})" - .format(e.code(), e.details())) + with channel: + request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, + action_digest=action_digest) + stub = remote_execution_pb2_grpc.ActionCacheStub(channel) + try: + result = stub.GetActionResult(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})" + .format(e.code(), e.details())) + else: + return None else: - return None - else: - self.info("Action result found in action cache") - return result + self.info("Action result found in action cache") + return result def _create_command(self, command, working_directory, environment): # Creates a command proto diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py new file mode 100644 index 000000000..862cebe87 --- /dev/null +++ b/src/buildstream/testing/_fixtures.py @@ -0,0 +1,31 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import psutil +import pytest + +from buildstream import utils + +# Catch tests that don't shut down background threads, which could then lead +# to other tests hanging when BuildStream uses fork(). +@pytest.fixture(autouse=True) +def thread_check(): + # xdist/execnet has its own helper thread. + # Ignore that for `utils._is_single_threaded` checks. + utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads() + + yield + assert utils._is_single_threaded() diff --git a/src/buildstream/testing/_forked.py b/src/buildstream/testing/_forked.py index af5e9c070..164906b0c 100644 --- a/src/buildstream/testing/_forked.py +++ b/src/buildstream/testing/_forked.py @@ -28,6 +28,8 @@ import pytest # XXX Using pytest private internals here from _pytest import runner +from buildstream import utils + EXITSTATUS_TESTEXIT = 4 @@ -48,6 +50,10 @@ def serialize_report(rep): def forked_run_report(item): def runforked(): + # This process is now the main BuildStream process + # for the duration of this test. + utils._MAIN_PID = os.getpid() + try: reports = runner.runtestprotocol(item, log=False) except KeyboardInterrupt: diff --git a/src/buildstream/testing/_sourcetests/conftest.py b/src/buildstream/testing/_sourcetests/conftest.py new file mode 100644 index 000000000..f16c1e9ad --- /dev/null +++ b/src/buildstream/testing/_sourcetests/conftest.py @@ -0,0 +1,17 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from .._fixtures import thread_check # pylint: disable=unused-import diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 998b77a71..872b5bd59 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -57,6 +57,10 @@ _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"] # Main process pid _MAIN_PID = os.getpid() +# The number of threads in the main process at startup. +# This is 1 except for certain test environments (xdist/execnet). +_INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1 + class UtilError(BstError): """Raised by utility functions when system calls fail. @@ -1391,3 +1395,19 @@ def _get_compression(tar): else: # Assume just an unconventional name was provided, default to uncompressed return '' + + +# _is_single_threaded() +# +# Return whether the current Process is single-threaded. Don't count threads +# in the main process that were created by a test environment (xdist/execnet) +# before BuildStream was executed. +# +def _is_single_threaded(): + # Use psutil as threading.active_count() doesn't include gRPC threads. + process = psutil.Process() + num_threads = process.num_threads() + if process.pid == _MAIN_PID: + return num_threads == _INITIAL_NUM_THREADS_IN_MAIN_PROCESS + else: + return num_threads == 1 diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py index b3bc7c218..00d14b45d 100644 --- a/tests/artifactcache/artifactservice.py +++ b/tests/artifactcache/artifactservice.py @@ -38,19 +38,19 @@ def test_artifact_get_not_found(tmpdir): with create_artifact_share(sharedir) as share: # set up artifact service stub url = urlparse(share.repo) - channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) - artifact_stub = ArtifactServiceStub(channel) + with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel: + artifact_stub = ArtifactServiceStub(channel) - # Run GetArtifact and check it throws a not found error - request = GetArtifactRequest() - request.cache_key = "@artifact/something/not_there" - try: - artifact_stub.GetArtifact(request) - except grpc.RpcError as e: - assert e.code() == grpc.StatusCode.NOT_FOUND - assert e.details() == "Artifact proto not found" - else: - assert False + # Run GetArtifact and check it throws a not found error + request = GetArtifactRequest() + request.cache_key = "@artifact/something/not_there" + try: + artifact_stub.GetArtifact(request) + except grpc.RpcError as e: + assert e.code() == grpc.StatusCode.NOT_FOUND + assert e.details() == "Artifact proto not found" + else: + assert False # Successfully getting the artifact @@ -70,42 +70,42 @@ def test_update_artifact(tmpdir, files): url = urlparse(share.repo) - channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) - artifact_stub = ArtifactServiceStub(channel) - - # initialise an artifact - artifact = Artifact() - artifact.version = 0 - artifact.build_success = True - artifact.strong_key = "abcdefghijklmnop" - artifact.files.hash = "hashymchashash" - artifact.files.size_bytes = 10 - - artifact.files.CopyFrom(digest) - - # Put it in the artifact share with an UpdateArtifactRequest - request = UpdateArtifactRequest() - request.artifact.CopyFrom(artifact) - request.cache_key = "a-cache-key" - - # should return the same artifact back - if files == "present": - response = artifact_stub.UpdateArtifact(request) + with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel: + artifact_stub = ArtifactServiceStub(channel) + + # initialise an artifact + artifact = Artifact() + artifact.version = 0 + artifact.build_success = True + artifact.strong_key = "abcdefghijklmnop" + artifact.files.hash = "hashymchashash" + artifact.files.size_bytes = 10 + + artifact.files.CopyFrom(digest) + + # Put it in the artifact share with an UpdateArtifactRequest + request = UpdateArtifactRequest() + request.artifact.CopyFrom(artifact) + request.cache_key = "a-cache-key" + + # should return the same artifact back + if files == "present": + response = artifact_stub.UpdateArtifact(request) + assert response == artifact + else: + try: + artifact_stub.UpdateArtifact(request) + except grpc.RpcError as e: + assert e.code() == grpc.StatusCode.FAILED_PRECONDITION + if files == "absent": + assert e.details() == "Artifact files specified but no files found" + elif files == "invalid": + assert e.details() == "Artifact files specified but directory not found" + return + + # If we uploaded the artifact check GetArtifact + request = GetArtifactRequest() + request.cache_key = "a-cache-key" + + response = artifact_stub.GetArtifact(request) assert response == artifact - else: - try: - artifact_stub.UpdateArtifact(request) - except grpc.RpcError as e: - assert e.code() == grpc.StatusCode.FAILED_PRECONDITION - if files == "absent": - assert e.details() == "Artifact files specified but no files found" - elif files == "invalid": - assert e.details() == "Artifact files specified but directory not found" - return - - # If we uploaded the artifact check GetArtifact - request = GetArtifactRequest() - request.cache_key = "a-cache-key" - - response = artifact_stub.GetArtifact(request) - assert response == artifact diff --git a/tests/conftest.py b/tests/conftest.py index 7728fb5c8..0e39943d3 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,7 @@ import os import pytest from buildstream.testing import register_repo_kind, sourcetests_collection_hook +from buildstream.testing._fixtures import thread_check # pylint: disable=unused-import from buildstream.testing._forked import forked_run_report from buildstream.testing.integration import integration_cache # pylint: disable=unused-import diff --git a/tests/testutils/http_server.py b/tests/testutils/http_server.py index 049769e72..6ecb7b5b3 100644 --- a/tests/testutils/http_server.py +++ b/tests/testutils/http_server.py @@ -106,6 +106,8 @@ class SimpleHttpServer(multiprocessing.Process): self.__stop.put(None) self.terminate() self.join() + self.__stop.close() + self.__stop.join_thread() def allow_anonymous(self, cwd): self.server.anonymous_dir = cwd |