diff options
author | Martin Blanchard <martin.blanchard@codethink.co.uk> | 2018-09-04 14:47:34 +0100 |
---|---|---|
committer | Martin Blanchard <martin.blanchard@codethink.co.uk> | 2018-09-07 14:22:48 +0100 |
commit | 923d443b1493e0745ff5878904ddf7fa8b2afb75 (patch) | |
tree | 1a1ce548180279582db2362d08b773500a0ecc70 /tests | |
parent | d4c515d0efa11dc980f284492ed6722c7962e398 (diff) | |
download | buildstream-923d443b1493e0745ff5878904ddf7fa8b2afb75.tar.gz |
tests/artifactcache: Add push unit-tests
https://gitlab.com/BuildStream/buildstream/issues/454
Diffstat (limited to 'tests')
-rw-r--r-- | tests/artifactcache/push.py | 312 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 18 |
2 files changed, 330 insertions, 0 deletions
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py new file mode 100644 index 000000000..bdeb86862 --- /dev/null +++ b/tests/artifactcache/push.py @@ -0,0 +1,312 @@ +import multiprocessing +import os +import signal + +import pytest + +from pluginbase import PluginBase +from buildstream import _yaml, _signals, utils +from buildstream._artifactcache.cascache import CASCache +from buildstream._context import Context +from buildstream._project import Project +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 +from buildstream.storage._casbaseddirectory import CasBasedDirectory + +from tests.testutils import cli, create_artifact_share + + +# Project directory +DATA_DIR = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "project", +) + + +# Handle messages from the pipeline +def message_handler(message, context): + pass + + +@pytest.mark.datafiles(DATA_DIR) +def test_push(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # First build the project without the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Assert that the element's artifact is cached + element = project.load_elements(['target.bst'], cas)[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert cas.contains(element, element_key) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_push, + args=(user_config_file, project_dir, artifact_dir, + 'target.bst', element_key, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert not error + assert share.has_artifact('test', 'target.bst', element_key) + + +def _test_push(user_config_file, project_dir, artifact_dir, + element_name, element_key, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Load the target element + element = project.load_elements([element_name], cas)[0] + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(element=element): + # Push the element's artifact + if not cas.push(element, [element_key]): + queue.put("Push operation failed") + else: + queue.put(None) + else: + queue.put("No remote configured for element {}".format(element_name)) + + +@pytest.mark.datafiles(DATA_DIR) +def test_push_directory(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # First build the project without the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts') + context.set_message_handler(message_handler) + + # Load the project and CAS cache + project = Project(project_dir, context) + project.ensure_fully_loaded() + cas = CASCache(context) + + # Assert that the element's artifact is cached + element = project.load_elements(['target.bst'], cas)[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert cas.contains(element, element_key) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + assert cas.has_push_remotes(element=element) + + # Recreate the CasBasedDirectory object from the cached artifact + artifact_ref = cas.get_artifact_fullname(element, element_key) + artifact_digest = cas.resolve_ref(artifact_ref) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_push_directory, + args=(user_config_file, project_dir, artifact_dir, + artifact_digest, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + directory_hash = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert directory_hash + assert artifact_digest.hash == directory_hash + assert share.has_object(artifact_digest) + + +def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_digest, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(): + # Create a CasBasedDirectory from local CAS cache content + directory = CasBasedDirectory(context, ref=artifact_digest) + + # Push the CasBasedDirectory object + directory_digest = cas.push_directory(project, directory) + + queue.put(directory_digest.hash) + else: + queue.put("No remote configured") + + +@pytest.mark.datafiles(DATA_DIR) +def test_push_message(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_push_message, + args=(user_config_file, project_dir, artifact_dir, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + message_hash, message_size = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert message_hash and message_size + message_digest = remote_execution_pb2.Digest(hash=message_hash, + size_bytes=message_size) + assert share.has_object(message_digest) + + +def _test_push_message(user_config_file, project_dir, artifact_dir, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(): + # Create an example message object + command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'], + working_directory='/buildstream-build', + output_directories=['/buildstream-install']) + + # Push the message object + command_digest = cas.push_message(project, command) + + queue.put((command_digest.hash, command_digest.size_bytes)) + else: + queue.put("No remote configured") diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index 05e87a499..e3f709b0a 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache from buildstream._artifactcache.casserver import create_server from buildstream._context import Context from buildstream._exceptions import ArtifactError +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 # ArtifactShare() @@ -87,6 +88,23 @@ class ArtifactShare(): # Sleep until termination by signal signal.pause() + # has_object(): + # + # Checks whether the object is present in the share + # + # Args: + # digest (str): The object's digest + # + # Returns: + # (bool): True if the object exists in the share, otherwise false. + def has_object(self, digest): + + assert isinstance(digest, remote_execution_pb2.Digest) + + object_path = self.cas.objpath(digest) + + return os.path.exists(object_path) + # has_artifact(): # # Checks whether the artifact is present in the share |