summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-09-03 10:35:00 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-03 10:35:00 +0000
commite485a0643ff5ec96750f778eb0d373efa2fe69c5 (patch)
treeb472768f0bc8fa9a678ea77c1f74dfe3ff3960ea
parent7dbcf42dfb0f7d612304c491c844355c58682e93 (diff)
parent3221f3511cbfe6a748b542627801acf8d0fcb379 (diff)
downloadbuildstream-e485a0643ff5ec96750f778eb0d373efa2fe69c5.tar.gz
Merge branch 'juerg/fork' into 'master'
Replace safeguard for fork with multiple threads See merge request BuildStream/buildstream!1577
-rw-r--r--requirements/requirements.txt2
-rw-r--r--src/buildstream/_basecache.py22
-rw-r--r--src/buildstream/_cas/cascache.py19
-rw-r--r--src/buildstream/_cas/casremote.py12
-rw-r--r--src/buildstream/_cas/casserver.py4
-rw-r--r--src/buildstream/_context.py29
-rw-r--r--src/buildstream/_scheduler/scheduler.py2
-rw-r--r--src/buildstream/_stream.py5
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py146
-rw-r--r--src/buildstream/testing/_fixtures.py31
-rw-r--r--src/buildstream/testing/_forked.py6
-rw-r--r--src/buildstream/testing/_sourcetests/conftest.py17
-rw-r--r--src/buildstream/utils.py20
-rw-r--r--tests/artifactcache/artifactservice.py100
-rwxr-xr-xtests/conftest.py1
-rw-r--r--tests/testutils/http_server.py2
16 files changed, 268 insertions, 150 deletions
diff --git a/requirements/requirements.txt b/requirements/requirements.txt
index 0f1f5599f..370316a68 100644
--- a/requirements/requirements.txt
+++ b/requirements/requirements.txt
@@ -1,5 +1,5 @@
Click==7.0
-grpcio==1.17.1
+grpcio==1.23.0
Jinja2==2.10
pluginbase==0.7
protobuf==3.6.1
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index 46de29f7b..9ad6c1277 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -59,6 +59,28 @@ class BaseCache():
self._has_fetch_remotes = False
self._has_push_remotes = False
+ # has_open_grpc_channels():
+ #
+ # Return whether there are gRPC channel instances. This is used to safeguard
+ # against fork() with open gRPC channels.
+ #
+ def has_open_grpc_channels(self):
+ for project_remotes in self._remotes.values():
+ for remote in project_remotes:
+ if remote.channel:
+ return True
+ return False
+
+ # release_resources():
+ #
+ # Release resources used by BaseCache.
+ #
+ def release_resources(self):
+ # Close all remotes and their gRPC channels
+ for project_remotes in self._remotes.values():
+ for remote in project_remotes:
+ remote.close()
+
# specs_from_config_node()
#
# Parses the configuration of remote artifact caches from a config block.
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 03d7e5dda..21c9f0ad6 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -86,7 +86,6 @@ class CASCache():
self._casd_channel = None
self._local_cas = None
- self._fork_disabled = False
def __getstate__(self):
state = self.__dict__.copy()
@@ -102,9 +101,6 @@ class CASCache():
assert self._casd_process, "CASCache was instantiated without buildbox-casd"
if not self._local_cas:
- # gRPC doesn't support fork without exec, which is used in the main process.
- assert self._fork_disabled or not utils._is_main_process()
-
self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
@@ -136,13 +132,13 @@ class CASCache():
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
- # notify_fork_disabled():
+ # has_open_grpc_channels():
#
- # Called by Context when fork() is disabled. This will enable communication
- # with casd via gRPC in the main process.
+ # Return whether there are gRPC channel instances. This is used to safeguard
+ # against fork() with open gRPC channels.
#
- def notify_fork_disabled(self):
- self._fork_disabled = True
+ def has_open_grpc_channels(self):
+ return bool(self._casd_channel)
# release_resources():
#
@@ -150,6 +146,11 @@ class CASCache():
#
def release_resources(self, messenger=None):
if self._casd_process:
+ if self._casd_channel:
+ self._local_cas = None
+ self._casd_channel.close()
+ self._casd_channel = None
+
self._casd_process.terminate()
try:
# Don't print anything if buildbox-casd terminates quickly
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 9cc1a5488..05b57d3d4 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -183,6 +183,18 @@ class CASRemote():
self._initialized = True
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+ return False
+
+ def close(self):
+ if self.channel:
+ self.channel.close()
+ self.channel = None
+
# check_remote
#
# Used when checking whether remote_specs work in the buildstream main
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 1835d118e..5a4c2b7ac 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -60,10 +60,6 @@ def create_server(repo, *, enable_push, quota):
cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
try:
- # Allow gRPC communication in main process as bst-artifact-server
- # doesn't use forked subprocesses.
- cas.notify_fork_disabled()
-
artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 0d250eb56..ecdf6f97b 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -150,8 +150,6 @@ class Context():
# Whether file contents are required for all artifacts in the local cache
self.require_artifact_files = True
- self.fork_allowed = True
-
# Whether elements must be rebuilt when their dependencies have changed
self._strict_build_plan = None
@@ -182,6 +180,12 @@ class Context():
# Called when exiting the with-statement context.
#
def __exit__(self, exc_type, exc_value, traceback):
+ if self._artifactcache:
+ self._artifactcache.release_resources()
+
+ if self._sourcecache:
+ self._sourcecache.release_resources()
+
if self._cascache:
self._cascache.release_resources(self.messenger)
@@ -500,12 +504,19 @@ class Context():
self._cascache = CASCache(self.cachedir, cache_quota=self.config_cache_quota)
return self._cascache
- # disable_fork():
+ # is_fork_allowed():
#
- # This will prevent the scheduler from running but will allow communication
- # with casd in the main process.
+ # Return whether fork without exec is allowed. This is a safeguard against
+ # fork issues with multiple threads and gRPC connections.
#
- def disable_fork(self):
- self.fork_allowed = False
- cascache = self.get_cascache()
- cascache.notify_fork_disabled()
+ def is_fork_allowed(self):
+ # Do not allow fork if there are background threads.
+ if not utils._is_single_threaded():
+ return False
+
+ # Do not allow fork if there are open gRPC channels.
+ for cache in [self._cascache, self._artifactcache, self._sourcecache]:
+ if cache and cache.has_open_grpc_channels():
+ return False
+
+ return True
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index b29bc8841..37295b285 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -110,7 +110,7 @@ class Scheduler():
#
def run(self, queues):
- assert self.context.fork_allowed
+ assert self.context.is_fork_allowed()
# Hold on to the queues to process
self.queues = queues
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 9ff93fdc3..f0e891dcf 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -223,8 +223,6 @@ class Stream():
else:
buildtree = True
- self._context.disable_fork()
-
return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
usebuildtree=buildtree)
@@ -551,8 +549,6 @@ class Stream():
self._enqueue_plan(uncached_elts)
self._run()
- self._context.disable_fork()
-
# Stage deps into a temporary sandbox first
if isinstance(target, ArtifactElement):
try:
@@ -623,7 +619,6 @@ class Stream():
load_refs=True)
if self._artifacts.has_fetch_remotes():
- self._context.disable_fork()
self._pipeline.check_remotes(target_objects)
# XXX: We need to set the name of an ArtifactElement to its ref in order
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index 4308d662b..e9e145d1f 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -281,10 +281,11 @@ class SandboxRemote(Sandbox):
context = self._get_context()
cascache = context.get_cascache()
artifactcache = context.artifactcache
- casremote = CASRemote(self.storage_remote_spec, cascache)
- # Now do a pull to ensure we have the full directory structure.
- dir_digest = cascache.pull_tree(casremote, tree_digest)
+ with CASRemote(self.storage_remote_spec, cascache) as casremote:
+ # Now do a pull to ensure we have the full directory structure.
+ dir_digest = cascache.pull_tree(casremote, tree_digest)
+
if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
raise SandboxError("Output directory structure pulling from remote failed.")
@@ -300,7 +301,6 @@ class SandboxRemote(Sandbox):
project = self._get_project()
cascache = context.get_cascache()
artifactcache = context.artifactcache
- casremote = CASRemote(self.storage_remote_spec, cascache)
# Fetch the file blobs if needed
if self._output_files_required or artifactcache.has_push_remotes():
@@ -319,7 +319,9 @@ class SandboxRemote(Sandbox):
# artifact servers.
blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs)
- remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
+ with CASRemote(self.storage_remote_spec, cascache) as casremote:
+ remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
+
if remote_missing_blobs:
raise SandboxError("{} output files are missing on the CAS server"
.format(len(remote_missing_blobs)))
@@ -351,65 +353,66 @@ class SandboxRemote(Sandbox):
input_root_digest=input_root_digest)
action_digest = utils._message_digest(action.SerializeToString())
- # Next, try to create a communication channel to the BuildGrid server.
- url = urlparse(self.exec_url)
- if not url.port:
- raise SandboxError("You must supply a protocol and port number in the execution-service url, "
- "for example: http://buildservice:50051.")
- if url.scheme == 'http':
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
- elif url.scheme == 'https':
- channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials)
- else:
- raise SandboxError("Remote execution currently only supports the 'http' protocol "
- "and '{}' was supplied.".format(url.scheme))
-
# check action cache download and download if there
action_result = self._check_action_cache(action_digest)
if not action_result:
- casremote = CASRemote(self.storage_remote_spec, cascache)
- try:
- casremote.init()
- except grpc.RpcError as e:
- raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}"
- .format(self.storage_url, e)) from e
-
- # Determine blobs missing on remote
- try:
- missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)
- except grpc.RpcError as e:
- raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
-
- # Check if any blobs are also missing locally (partial artifact)
- # and pull them from the artifact cache.
- try:
- local_missing_blobs = cascache.local_missing_blobs(missing_blobs)
- if local_missing_blobs:
- artifactcache.fetch_missing_blobs(project, local_missing_blobs)
- except (grpc.RpcError, BstError) as e:
- raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
-
- # Now, push the missing blobs to the remote.
- try:
- cascache.send_blobs(casremote, missing_blobs)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
-
- # Push command and action
- try:
- casremote.push_message(command_proto)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push command to remote: {}".format(e))
-
- try:
- casremote.push_message(action)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push action to remote: {}".format(e))
+ with CASRemote(self.storage_remote_spec, cascache) as casremote:
+ try:
+ casremote.init()
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}"
+ .format(self.storage_url, e)) from e
+
+ # Determine blobs missing on remote
+ try:
+ missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
+
+ # Check if any blobs are also missing locally (partial artifact)
+ # and pull them from the artifact cache.
+ try:
+ local_missing_blobs = cascache.local_missing_blobs(missing_blobs)
+ if local_missing_blobs:
+ artifactcache.fetch_missing_blobs(project, local_missing_blobs)
+ except (grpc.RpcError, BstError) as e:
+ raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
+
+ # Now, push the missing blobs to the remote.
+ try:
+ cascache.send_blobs(casremote, missing_blobs)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+
+ # Push command and action
+ try:
+ casremote.push_message(command_proto)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push command to remote: {}".format(e))
+
+ try:
+ casremote.push_message(action)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push action to remote: {}".format(e))
+
+ # Next, try to create a communication channel to the BuildGrid server.
+ url = urlparse(self.exec_url)
+ if not url.port:
+ raise SandboxError("You must supply a protocol and port number in the execution-service url, "
+ "for example: http://buildservice:50051.")
+ if url.scheme == 'http':
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
+ elif url.scheme == 'https':
+ channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials)
+ else:
+ raise SandboxError("Remote execution currently only supports the 'http' protocol "
+ "and '{}' was supplied.".format(url.scheme))
# Now request to execute the action
- operation = self.run_remote_command(channel, action_digest)
- action_result = self._extract_action_result(operation)
+ with channel:
+ operation = self.run_remote_command(channel, action_digest)
+ action_result = self._extract_action_result(operation)
# Get output of build
self.process_job_output(action_result.output_directories, action_result.output_files,
@@ -445,20 +448,21 @@ class SandboxRemote(Sandbox):
elif url.scheme == 'https':
channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials)
- request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance,
- action_digest=action_digest)
- stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
- try:
- result = stub.GetActionResult(request)
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise SandboxError("Failed to query action cache: {} ({})"
- .format(e.code(), e.details()))
+ with channel:
+ request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance,
+ action_digest=action_digest)
+ stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
+ try:
+ result = stub.GetActionResult(request)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SandboxError("Failed to query action cache: {} ({})"
+ .format(e.code(), e.details()))
+ else:
+ return None
else:
- return None
- else:
- self.info("Action result found in action cache")
- return result
+ self.info("Action result found in action cache")
+ return result
def _create_command(self, command, working_directory, environment):
# Creates a command proto
diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
new file mode 100644
index 000000000..862cebe87
--- /dev/null
+++ b/src/buildstream/testing/_fixtures.py
@@ -0,0 +1,31 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import psutil
+import pytest
+
+from buildstream import utils
+
+# Catch tests that don't shut down background threads, which could then lead
+# to other tests hanging when BuildStream uses fork().
+@pytest.fixture(autouse=True)
+def thread_check():
+ # xdist/execnet has its own helper thread.
+ # Ignore that for `utils._is_single_threaded` checks.
+ utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads()
+
+ yield
+ assert utils._is_single_threaded()
diff --git a/src/buildstream/testing/_forked.py b/src/buildstream/testing/_forked.py
index af5e9c070..164906b0c 100644
--- a/src/buildstream/testing/_forked.py
+++ b/src/buildstream/testing/_forked.py
@@ -28,6 +28,8 @@ import pytest
# XXX Using pytest private internals here
from _pytest import runner
+from buildstream import utils
+
EXITSTATUS_TESTEXIT = 4
@@ -48,6 +50,10 @@ def serialize_report(rep):
def forked_run_report(item):
def runforked():
+ # This process is now the main BuildStream process
+ # for the duration of this test.
+ utils._MAIN_PID = os.getpid()
+
try:
reports = runner.runtestprotocol(item, log=False)
except KeyboardInterrupt:
diff --git a/src/buildstream/testing/_sourcetests/conftest.py b/src/buildstream/testing/_sourcetests/conftest.py
new file mode 100644
index 000000000..f16c1e9ad
--- /dev/null
+++ b/src/buildstream/testing/_sourcetests/conftest.py
@@ -0,0 +1,17 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from .._fixtures import thread_check # pylint: disable=unused-import
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 998b77a71..872b5bd59 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -57,6 +57,10 @@ _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"]
# Main process pid
_MAIN_PID = os.getpid()
+# The number of threads in the main process at startup.
+# This is 1 except for certain test environments (xdist/execnet).
+_INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1
+
class UtilError(BstError):
"""Raised by utility functions when system calls fail.
@@ -1391,3 +1395,19 @@ def _get_compression(tar):
else:
# Assume just an unconventional name was provided, default to uncompressed
return ''
+
+
+# _is_single_threaded()
+#
+# Return whether the current Process is single-threaded. Don't count threads
+# in the main process that were created by a test environment (xdist/execnet)
+# before BuildStream was executed.
+#
+def _is_single_threaded():
+ # Use psutil as threading.active_count() doesn't include gRPC threads.
+ process = psutil.Process()
+ num_threads = process.num_threads()
+ if process.pid == _MAIN_PID:
+ return num_threads == _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
+ else:
+ return num_threads == 1
diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py
index b3bc7c218..00d14b45d 100644
--- a/tests/artifactcache/artifactservice.py
+++ b/tests/artifactcache/artifactservice.py
@@ -38,19 +38,19 @@ def test_artifact_get_not_found(tmpdir):
with create_artifact_share(sharedir) as share:
# set up artifact service stub
url = urlparse(share.repo)
- channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
- artifact_stub = ArtifactServiceStub(channel)
+ with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel:
+ artifact_stub = ArtifactServiceStub(channel)
- # Run GetArtifact and check it throws a not found error
- request = GetArtifactRequest()
- request.cache_key = "@artifact/something/not_there"
- try:
- artifact_stub.GetArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.NOT_FOUND
- assert e.details() == "Artifact proto not found"
- else:
- assert False
+ # Run GetArtifact and check it throws a not found error
+ request = GetArtifactRequest()
+ request.cache_key = "@artifact/something/not_there"
+ try:
+ artifact_stub.GetArtifact(request)
+ except grpc.RpcError as e:
+ assert e.code() == grpc.StatusCode.NOT_FOUND
+ assert e.details() == "Artifact proto not found"
+ else:
+ assert False
# Successfully getting the artifact
@@ -70,42 +70,42 @@ def test_update_artifact(tmpdir, files):
url = urlparse(share.repo)
- channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
- artifact_stub = ArtifactServiceStub(channel)
-
- # initialise an artifact
- artifact = Artifact()
- artifact.version = 0
- artifact.build_success = True
- artifact.strong_key = "abcdefghijklmnop"
- artifact.files.hash = "hashymchashash"
- artifact.files.size_bytes = 10
-
- artifact.files.CopyFrom(digest)
-
- # Put it in the artifact share with an UpdateArtifactRequest
- request = UpdateArtifactRequest()
- request.artifact.CopyFrom(artifact)
- request.cache_key = "a-cache-key"
-
- # should return the same artifact back
- if files == "present":
- response = artifact_stub.UpdateArtifact(request)
+ with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel:
+ artifact_stub = ArtifactServiceStub(channel)
+
+ # initialise an artifact
+ artifact = Artifact()
+ artifact.version = 0
+ artifact.build_success = True
+ artifact.strong_key = "abcdefghijklmnop"
+ artifact.files.hash = "hashymchashash"
+ artifact.files.size_bytes = 10
+
+ artifact.files.CopyFrom(digest)
+
+ # Put it in the artifact share with an UpdateArtifactRequest
+ request = UpdateArtifactRequest()
+ request.artifact.CopyFrom(artifact)
+ request.cache_key = "a-cache-key"
+
+ # should return the same artifact back
+ if files == "present":
+ response = artifact_stub.UpdateArtifact(request)
+ assert response == artifact
+ else:
+ try:
+ artifact_stub.UpdateArtifact(request)
+ except grpc.RpcError as e:
+ assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
+ if files == "absent":
+ assert e.details() == "Artifact files specified but no files found"
+ elif files == "invalid":
+ assert e.details() == "Artifact files specified but directory not found"
+ return
+
+ # If we uploaded the artifact check GetArtifact
+ request = GetArtifactRequest()
+ request.cache_key = "a-cache-key"
+
+ response = artifact_stub.GetArtifact(request)
assert response == artifact
- else:
- try:
- artifact_stub.UpdateArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
- if files == "absent":
- assert e.details() == "Artifact files specified but no files found"
- elif files == "invalid":
- assert e.details() == "Artifact files specified but directory not found"
- return
-
- # If we uploaded the artifact check GetArtifact
- request = GetArtifactRequest()
- request.cache_key = "a-cache-key"
-
- response = artifact_stub.GetArtifact(request)
- assert response == artifact
diff --git a/tests/conftest.py b/tests/conftest.py
index 7728fb5c8..0e39943d3 100755
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -23,6 +23,7 @@ import os
import pytest
from buildstream.testing import register_repo_kind, sourcetests_collection_hook
+from buildstream.testing._fixtures import thread_check # pylint: disable=unused-import
from buildstream.testing._forked import forked_run_report
from buildstream.testing.integration import integration_cache # pylint: disable=unused-import
diff --git a/tests/testutils/http_server.py b/tests/testutils/http_server.py
index 049769e72..6ecb7b5b3 100644
--- a/tests/testutils/http_server.py
+++ b/tests/testutils/http_server.py
@@ -106,6 +106,8 @@ class SimpleHttpServer(multiprocessing.Process):
self.__stop.put(None)
self.terminate()
self.join()
+ self.__stop.close()
+ self.__stop.join_thread()
def allow_anonymous(self, cwd):
self.server.anonymous_dir = cwd