summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Blanchard <martin.blanchard@codethink.co.uk>2018-09-04 14:47:34 +0100
committerMartin Blanchard <martin.blanchard@codethink.co.uk>2018-09-07 14:22:48 +0100
commit923d443b1493e0745ff5878904ddf7fa8b2afb75 (patch)
tree1a1ce548180279582db2362d08b773500a0ecc70
parentd4c515d0efa11dc980f284492ed6722c7962e398 (diff)
downloadbuildstream-923d443b1493e0745ff5878904ddf7fa8b2afb75.tar.gz
tests/artifactcache: Add push unit-tests
https://gitlab.com/BuildStream/buildstream/issues/454
-rw-r--r--tests/artifactcache/push.py312
-rw-r--r--tests/testutils/artifactshare.py18
2 files changed, 330 insertions, 0 deletions
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
new file mode 100644
index 000000000..bdeb86862
--- /dev/null
+++ b/tests/artifactcache/push.py
@@ -0,0 +1,312 @@
+import multiprocessing
+import os
+import signal
+
+import pytest
+
+from pluginbase import PluginBase
+from buildstream import _yaml, _signals, utils
+from buildstream._artifactcache.cascache import CASCache
+from buildstream._context import Context
+from buildstream._project import Project
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from buildstream.storage._casbaseddirectory import CasBasedDirectory
+
+from tests.testutils import cli, create_artifact_share
+
+
+# Project directory
+DATA_DIR = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ "project",
+)
+
+
+# Handle messages from the pipeline
+def message_handler(message, context):
+ pass
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_push(cli, tmpdir, datafiles):
+ project_dir = str(datafiles)
+
+ # First build the project without the artifact cache configured
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
+ result.assert_success()
+
+ # Assert that we are now cached locally
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+
+ # Set up an artifact cache.
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+ # Configure artifact share
+ artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+ user_config_file = str(tmpdir.join('buildstream.conf'))
+ user_config = {
+ 'scheduler': {
+ 'pushers': 1
+ },
+ 'artifacts': {
+ 'url': share.repo,
+ 'push': True,
+ }
+ }
+
+ # Write down the user configuration file
+ _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+
+ # Fake minimal context
+ context = Context()
+ context.load(config=user_config_file)
+ context.artifactdir = artifact_dir
+ context.set_message_handler(message_handler)
+
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
+ # Create a local CAS cache handle
+ cas = CASCache(context)
+
+ # Assert that the element's artifact is cached
+ element = project.load_elements(['target.bst'], cas)[0]
+ element_key = cli.get_element_key(project_dir, 'target.bst')
+ assert cas.contains(element, element_key)
+
+ queue = multiprocessing.Queue()
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+ process = multiprocessing.Process(target=_test_push,
+ args=(user_config_file, project_dir, artifact_dir,
+ 'target.bst', element_key, queue))
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+
+ error = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+
+ assert not error
+ assert share.has_artifact('test', 'target.bst', element_key)
+
+
+def _test_push(user_config_file, project_dir, artifact_dir,
+ element_name, element_key, queue):
+ # Fake minimal context
+ context = Context()
+ context.load(config=user_config_file)
+ context.artifactdir = artifact_dir
+ context.set_message_handler(message_handler)
+
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
+ # Create a local CAS cache handle
+ cas = CASCache(context)
+
+ # Load the target element
+ element = project.load_elements([element_name], cas)[0]
+
+ # Manually setup the CAS remote
+ cas.setup_remotes(use_config=True)
+ cas.initialize_remotes()
+
+ if cas.has_push_remotes(element=element):
+ # Push the element's artifact
+ if not cas.push(element, [element_key]):
+ queue.put("Push operation failed")
+ else:
+ queue.put(None)
+ else:
+ queue.put("No remote configured for element {}".format(element_name))
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_directory(cli, tmpdir, datafiles):
+ project_dir = str(datafiles)
+
+ # First build the project without the artifact cache configured
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
+ result.assert_success()
+
+ # Assert that we are now cached locally
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+
+ # Set up an artifact cache.
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+ # Configure artifact share
+ artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+ user_config_file = str(tmpdir.join('buildstream.conf'))
+ user_config = {
+ 'scheduler': {
+ 'pushers': 1
+ },
+ 'artifacts': {
+ 'url': share.repo,
+ 'push': True,
+ }
+ }
+
+ # Write down the user configuration file
+ _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+
+ # Fake minimal context
+ context = Context()
+ context.load(config=user_config_file)
+ context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+ context.set_message_handler(message_handler)
+
+ # Load the project and CAS cache
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+ cas = CASCache(context)
+
+ # Assert that the element's artifact is cached
+ element = project.load_elements(['target.bst'], cas)[0]
+ element_key = cli.get_element_key(project_dir, 'target.bst')
+ assert cas.contains(element, element_key)
+
+ # Manually setup the CAS remote
+ cas.setup_remotes(use_config=True)
+ cas.initialize_remotes()
+ assert cas.has_push_remotes(element=element)
+
+ # Recreate the CasBasedDirectory object from the cached artifact
+ artifact_ref = cas.get_artifact_fullname(element, element_key)
+ artifact_digest = cas.resolve_ref(artifact_ref)
+
+ queue = multiprocessing.Queue()
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+ process = multiprocessing.Process(target=_test_push_directory,
+ args=(user_config_file, project_dir, artifact_dir,
+ artifact_digest, queue))
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+
+ directory_hash = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+
+ assert directory_hash
+ assert artifact_digest.hash == directory_hash
+ assert share.has_object(artifact_digest)
+
+
+def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_digest, queue):
+ # Fake minimal context
+ context = Context()
+ context.load(config=user_config_file)
+ context.artifactdir = artifact_dir
+ context.set_message_handler(message_handler)
+
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
+ # Create a local CAS cache handle
+ cas = CASCache(context)
+
+ # Manually setup the CAS remote
+ cas.setup_remotes(use_config=True)
+ cas.initialize_remotes()
+
+ if cas.has_push_remotes():
+ # Create a CasBasedDirectory from local CAS cache content
+ directory = CasBasedDirectory(context, ref=artifact_digest)
+
+ # Push the CasBasedDirectory object
+ directory_digest = cas.push_directory(project, directory)
+
+ queue.put(directory_digest.hash)
+ else:
+ queue.put("No remote configured")
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_message(cli, tmpdir, datafiles):
+ project_dir = str(datafiles)
+
+ # Set up an artifact cache.
+ with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+ # Configure artifact share
+ artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+ user_config_file = str(tmpdir.join('buildstream.conf'))
+ user_config = {
+ 'scheduler': {
+ 'pushers': 1
+ },
+ 'artifacts': {
+ 'url': share.repo,
+ 'push': True,
+ }
+ }
+
+ # Write down the user configuration file
+ _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+
+ queue = multiprocessing.Queue()
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+ process = multiprocessing.Process(target=_test_push_message,
+ args=(user_config_file, project_dir, artifact_dir, queue))
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+
+ message_hash, message_size = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+
+ assert message_hash and message_size
+ message_digest = remote_execution_pb2.Digest(hash=message_hash,
+ size_bytes=message_size)
+ assert share.has_object(message_digest)
+
+
+def _test_push_message(user_config_file, project_dir, artifact_dir, queue):
+ # Fake minimal context
+ context = Context()
+ context.load(config=user_config_file)
+ context.artifactdir = artifact_dir
+ context.set_message_handler(message_handler)
+
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
+ # Create a local CAS cache handle
+ cas = CASCache(context)
+
+ # Manually setup the CAS remote
+ cas.setup_remotes(use_config=True)
+ cas.initialize_remotes()
+
+ if cas.has_push_remotes():
+ # Create an example message object
+ command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'],
+ working_directory='/buildstream-build',
+ output_directories=['/buildstream-install'])
+
+ # Push the message object
+ command_digest = cas.push_message(project, command)
+
+ queue.put((command_digest.hash, command_digest.size_bytes))
+ else:
+ queue.put("No remote configured")
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 05e87a499..e3f709b0a 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache
from buildstream._artifactcache.casserver import create_server
from buildstream._context import Context
from buildstream._exceptions import ArtifactError
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
# ArtifactShare()
@@ -87,6 +88,23 @@ class ArtifactShare():
# Sleep until termination by signal
signal.pause()
+ # has_object():
+ #
+ # Checks whether the object is present in the share
+ #
+ # Args:
+ # digest (str): The object's digest
+ #
+ # Returns:
+ # (bool): True if the object exists in the share, otherwise false.
+ def has_object(self, digest):
+
+ assert isinstance(digest, remote_execution_pb2.Digest)
+
+ object_path = self.cas.objpath(digest)
+
+ return os.path.exists(object_path)
+
# has_artifact():
#
# Checks whether the artifact is present in the share