diff options
Diffstat (limited to 'tests/artifactcache/pull.py')
-rw-r--r-- | tests/artifactcache/pull.py | 185 |
1 files changed, 39 insertions, 146 deletions
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 6003cea41..71db3e338 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -1,13 +1,11 @@ # Pylint doesn't play well with fixtures and dependency injection from pytest # pylint: disable=redefined-outer-name -import multiprocessing import os -import signal import pytest -from buildstream import _yaml, _signals, utils +from buildstream import _yaml from buildstream._project import Project from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildstream.testing import cli # pylint: disable=unused-import @@ -22,17 +20,6 @@ DATA_DIR = os.path.join( ) -# Since parent processes wait for queue events, we need -# to put something on it if the called process raises an -# exception. -def _queue_wrapper(target, queue, *args): - try: - target(*args, queue=queue) - except Exception as e: - queue.put(str(e)) - raise - - def tree_maker(cas, tree, directory): if tree.root.ByteSize() == 0: tree.root.CopyFrom(directory) @@ -46,6 +33,7 @@ def tree_maker(cas, tree, directory): tree_maker(cas, tree, child_directory) +@pytest.mark.in_subprocess @pytest.mark.datafiles(DATA_DIR) def test_pull(cli, tmpdir, datafiles): project_dir = str(datafiles) @@ -96,58 +84,28 @@ def test_pull(cli, tmpdir, datafiles): element_key = cli.get_element_key(project_dir, 'target.bst') assert not cli.artifact.is_cached(cache_dir, element, element_key) - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_pull, queue, user_config_file, project_dir, - cache_dir, 'target.bst', element_key)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert not error - assert cli.artifact.is_cached(cache_dir, element, element_key) - - -def _test_pull(user_config_file, project_dir, cache_dir, - element_name, element_key, queue): - with dummy_context(config=user_config_file) as context: - context.cachedir = cache_dir - context.casdir = os.path.join(cache_dir, 'cas') - context.tmpdir = os.path.join(cache_dir, 'tmp') + context.cachedir = cache_dir + context.casdir = os.path.join(cache_dir, 'cas') + context.tmpdir = os.path.join(cache_dir, 'tmp') - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() - # Create a local artifact cache handle - artifactcache = context.artifactcache + # Create a local artifact cache handle + artifactcache = context.artifactcache - # Load the target element - element = project.load_elements([element_name])[0] + # Manually setup the CAS remote + artifactcache.setup_remotes(use_config=True) - # Manually setup the CAS remote - artifactcache.setup_remotes(use_config=True) + assert artifactcache.has_push_remotes(plugin=element), \ + "No remote configured for element target.bst" + assert artifactcache.pull(element, element_key), "Pull operation failed" - if artifactcache.has_push_remotes(plugin=element): - # Push the element's artifact - if not artifactcache.pull(element, element_key): - queue.put("Pull operation failed") - else: - queue.put(None) - else: - queue.put("No remote configured for element {}".format(element_name)) + assert cli.artifact.is_cached(cache_dir, element, element_key) +@pytest.mark.in_subprocess @pytest.mark.datafiles(DATA_DIR) def test_pull_tree(cli, tmpdir, datafiles): project_dir = str(datafiles) @@ -196,76 +154,11 @@ def test_pull_tree(cli, tmpdir, datafiles): # Retrieve the Directory object from the cached artifact artifact_digest = cli.artifact.get_digest(rootcache_dir, element, element_key) - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_push_tree, queue, user_config_file, project_dir, - artifact_digest)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - tree_hash, tree_size = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - assert tree_hash and tree_size - - # Now delete the artifact locally - cli.remove_artifact_from_cache(project_dir, 'target.bst') - - # Assert that we are not cached locally anymore - assert cli.get_element_state(project_dir, 'target.bst') != 'cached' - - tree_digest = remote_execution_pb2.Digest(hash=tree_hash, - size_bytes=tree_size) - - queue = multiprocessing.Queue() - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - process = multiprocessing.Process(target=_queue_wrapper, - args=(_test_pull_tree, queue, user_config_file, project_dir, - tree_digest)) - - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - directory_hash, directory_size = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise - - # Directory size now zero with AaaP and stack element commit #1cbc5e63dc - assert directory_hash and not directory_size - - directory_digest = remote_execution_pb2.Digest(hash=directory_hash, - size_bytes=directory_size) - - # Ensure the entire Tree stucture has been pulled - assert os.path.exists(cas.objpath(directory_digest)) - - -def _test_push_tree(user_config_file, project_dir, artifact_digest, queue): - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() - - # Create a local artifact cache and cas handle - artifactcache = context.artifactcache - cas = context.get_cascache() - - # Manually setup the CAS remote - artifactcache.setup_remotes(use_config=True) + artifactcache = context.artifactcache + # Manually setup the CAS remote + artifactcache.setup_remotes(use_config=True) + assert artifactcache.has_push_remotes() - if artifactcache.has_push_remotes(): directory = remote_execution_pb2.Directory() with open(cas.objpath(artifact_digest), 'rb') as f: @@ -277,27 +170,27 @@ def _test_push_tree(user_config_file, project_dir, artifact_digest, queue): # Push the Tree as a regular message tree_digest = artifactcache.push_message(project, tree) + tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes + assert tree_hash and tree_size - queue.put((tree_digest.hash, tree_digest.size_bytes)) - else: - queue.put("No remote configured") + # Now delete the artifact locally + cli.remove_artifact_from_cache(project_dir, 'target.bst') + # Assert that we are not cached locally anymore + assert cli.get_element_state(project_dir, 'target.bst') != 'cached' -def _test_pull_tree(user_config_file, project_dir, artifact_digest, queue): - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() + tree_digest = remote_execution_pb2.Digest(hash=tree_hash, + size_bytes=tree_size) - # Create a local artifact cache handle - artifactcache = context.artifactcache - - # Manually setup the CAS remote - artifactcache.setup_remotes(use_config=True) - - if artifactcache.has_push_remotes(): # Pull the artifact using the Tree object directory_digest = artifactcache.pull_tree(project, artifact_digest) - queue.put((directory_digest.hash, directory_digest.size_bytes)) - else: - queue.put("No remote configured") + directory_hash, directory_size = directory_digest.hash, directory_digest.size_bytes + + # Directory size now zero with AaaP and stack element commit #1cbc5e63dc + assert directory_hash and not directory_size + + directory_digest = remote_execution_pb2.Digest(hash=directory_hash, + size_bytes=directory_size) + + # Ensure the entire Tree stucture has been pulled + assert os.path.exists(cas.objpath(directory_digest)) |