summaryrefslogtreecommitdiff
path: root/tests/artifactcache/pull.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/artifactcache/pull.py')
-rw-r--r--tests/artifactcache/pull.py185
1 files changed, 39 insertions, 146 deletions
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 6003cea41..71db3e338 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -1,13 +1,11 @@
# Pylint doesn't play well with fixtures and dependency injection from pytest
# pylint: disable=redefined-outer-name
-import multiprocessing
import os
-import signal
import pytest
-from buildstream import _yaml, _signals, utils
+from buildstream import _yaml
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -22,17 +20,6 @@ DATA_DIR = os.path.join(
)
-# Since parent processes wait for queue events, we need
-# to put something on it if the called process raises an
-# exception.
-def _queue_wrapper(target, queue, *args):
- try:
- target(*args, queue=queue)
- except Exception as e:
- queue.put(str(e))
- raise
-
-
def tree_maker(cas, tree, directory):
if tree.root.ByteSize() == 0:
tree.root.CopyFrom(directory)
@@ -46,6 +33,7 @@ def tree_maker(cas, tree, directory):
tree_maker(cas, tree, child_directory)
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_pull(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -96,58 +84,28 @@ def test_pull(cli, tmpdir, datafiles):
element_key = cli.get_element_key(project_dir, 'target.bst')
assert not cli.artifact.is_cached(cache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_pull, queue, user_config_file, project_dir,
- cache_dir, 'target.bst', element_key))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert not error
- assert cli.artifact.is_cached(cache_dir, element, element_key)
-
-
-def _test_pull(user_config_file, project_dir, cache_dir,
- element_name, element_key, queue):
- with dummy_context(config=user_config_file) as context:
- context.cachedir = cache_dir
- context.casdir = os.path.join(cache_dir, 'cas')
- context.tmpdir = os.path.join(cache_dir, 'tmp')
+ context.cachedir = cache_dir
+ context.casdir = os.path.join(cache_dir, 'cas')
+ context.tmpdir = os.path.join(cache_dir, 'tmp')
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
+ # Create a local artifact cache handle
+ artifactcache = context.artifactcache
- # Load the target element
- element = project.load_elements([element_name])[0]
+ # Manually setup the CAS remote
+ artifactcache.setup_remotes(use_config=True)
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
+ assert artifactcache.has_push_remotes(plugin=element), \
+ "No remote configured for element target.bst"
+ assert artifactcache.pull(element, element_key), "Pull operation failed"
- if artifactcache.has_push_remotes(plugin=element):
- # Push the element's artifact
- if not artifactcache.pull(element, element_key):
- queue.put("Pull operation failed")
- else:
- queue.put(None)
- else:
- queue.put("No remote configured for element {}".format(element_name))
+ assert cli.artifact.is_cached(cache_dir, element, element_key)
+@pytest.mark.in_subprocess
@pytest.mark.datafiles(DATA_DIR)
def test_pull_tree(cli, tmpdir, datafiles):
project_dir = str(datafiles)
@@ -196,76 +154,11 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Retrieve the Directory object from the cached artifact
artifact_digest = cli.artifact.get_digest(rootcache_dir, element, element_key)
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_push_tree, queue, user_config_file, project_dir,
- artifact_digest))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- tree_hash, tree_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- assert tree_hash and tree_size
-
- # Now delete the artifact locally
- cli.remove_artifact_from_cache(project_dir, 'target.bst')
-
- # Assert that we are not cached locally anymore
- assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
-
- tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
- size_bytes=tree_size)
-
- queue = multiprocessing.Queue()
- # Use subprocess to avoid creation of gRPC threads in main BuildStream process
- process = multiprocessing.Process(target=_queue_wrapper,
- args=(_test_pull_tree, queue, user_config_file, project_dir,
- tree_digest))
-
- try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- directory_hash, directory_size = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
-
- # Directory size now zero with AaaP and stack element commit #1cbc5e63dc
- assert directory_hash and not directory_size
-
- directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
- size_bytes=directory_size)
-
- # Ensure the entire Tree stucture has been pulled
- assert os.path.exists(cas.objpath(directory_digest))
-
-
-def _test_push_tree(user_config_file, project_dir, artifact_digest, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
-
- # Create a local artifact cache and cas handle
- artifactcache = context.artifactcache
- cas = context.get_cascache()
-
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
+ artifactcache = context.artifactcache
+ # Manually setup the CAS remote
+ artifactcache.setup_remotes(use_config=True)
+ assert artifactcache.has_push_remotes()
- if artifactcache.has_push_remotes():
directory = remote_execution_pb2.Directory()
with open(cas.objpath(artifact_digest), 'rb') as f:
@@ -277,27 +170,27 @@ def _test_push_tree(user_config_file, project_dir, artifact_digest, queue):
# Push the Tree as a regular message
tree_digest = artifactcache.push_message(project, tree)
+ tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes
+ assert tree_hash and tree_size
- queue.put((tree_digest.hash, tree_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ # Now delete the artifact locally
+ cli.remove_artifact_from_cache(project_dir, 'target.bst')
+ # Assert that we are not cached locally anymore
+ assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
-def _test_pull_tree(user_config_file, project_dir, artifact_digest, queue):
- with dummy_context(config=user_config_file) as context:
- # Load the project manually
- project = Project(project_dir, context)
- project.ensure_fully_loaded()
+ tree_digest = remote_execution_pb2.Digest(hash=tree_hash,
+ size_bytes=tree_size)
- # Create a local artifact cache handle
- artifactcache = context.artifactcache
-
- # Manually setup the CAS remote
- artifactcache.setup_remotes(use_config=True)
-
- if artifactcache.has_push_remotes():
# Pull the artifact using the Tree object
directory_digest = artifactcache.pull_tree(project, artifact_digest)
- queue.put((directory_digest.hash, directory_digest.size_bytes))
- else:
- queue.put("No remote configured")
+ directory_hash, directory_size = directory_digest.hash, directory_digest.size_bytes
+
+ # Directory size now zero with AaaP and stack element commit #1cbc5e63dc
+ assert directory_hash and not directory_size
+
+ directory_digest = remote_execution_pb2.Digest(hash=directory_hash,
+ size_bytes=directory_size)
+
+ # Ensure the entire Tree stucture has been pulled
+ assert os.path.exists(cas.objpath(directory_digest))