diff options
author | Finn <finn.ball@codethink.co.uk> | 2018-11-28 13:50:43 +0000 |
---|---|---|
committer | Finn <finn.ball@codethink.co.uk> | 2018-12-07 15:05:25 +0000 |
commit | 4bebfa0474bb49346f83e66f7688311ca4cbc9a8 (patch) | |
tree | 89b65d280cab4f8b1052740bb4dbb0a5bd3baebf | |
parent | 6e3e34f73c68e36a3eaab6b6dbb8d3b7972561f7 (diff) | |
download | buildstream-finn/cas-error-timeouts.tar.gz |
working copyfinn/cas-error-timeouts
-rw-r--r-- | tests/artifactcache/pull.py | 103 | ||||
-rw-r--r-- | tests/testutils/__init__.py | 1 | ||||
-rw-r--r-- | tests/testutils/timeoutserver.py | 236 |
3 files changed, 339 insertions, 1 deletions
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 4c332bf36..bec21d1cc 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -10,7 +10,7 @@ from buildstream._context import Context from buildstream._project import Project from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 -from tests.testutils import cli, create_artifact_share +from tests.testutils import cli, create_artifact_share, create_timeout_artifact_share # Project directory @@ -327,3 +327,104 @@ def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest queue.put((directory_digest.hash, directory_digest.size_bytes)) else: queue.put("No remote configured") + + +@pytest.mark.datafiles(DATA_DIR) +def test_pully_pull(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # Set up an artifact cache. + with create_timeout_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + # Ensure CLI calls will use it + cli.configure(user_config) + + # First build the project with the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' + # Assert that we shared/pushed the cached artifact + element_key = cli.get_element_key(project_dir, 'target.bst') + assert share.has_artifact('test', 'target.bst', element_key) + + # Delete the artifact locally + cli.remove_artifact_from_cache(project_dir, 'target.bst') + + # Assert that we are not cached locally anymore + assert cli.get_element_state(project_dir, 'target.bst') != 'cached' + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts') + context.set_message_handler(message_handler) + + # Load the project and CAS cache + project = Project(project_dir, context) + project.ensure_fully_loaded() + cas = context.artifactcache + + # Assert that the element's artifact is **not** cached + element = project.load_elements(['target.bst'])[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert not cas.contains(element, element_key) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_pull, + args=(user_config_file, project_dir, artifact_dir, + 'target.bst', element_key, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert not error + assert cas.contains(element, element_key) + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = context.artifactcache + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + + if cas.has_push_remotes(): + # Pull the artifact using the Tree object + directory_digest = cas.pull_tree(project, artifact_digest) + queue.put((directory_digest.hash, directory_digest.size_bytes)) + else: + queue.put("No remote configured") diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py index eb7211ea8..6b6418309 100644 --- a/tests/testutils/__init__.py +++ b/tests/testutils/__init__.py @@ -30,3 +30,4 @@ from .element_generators import create_element_size, update_element_size from .junction import generate_junction from .runner_integration import wait_for_cache_granularity from .python_repo import setup_pypi_repo +from .timeoutserver import create_timeout_artifact_share diff --git a/tests/testutils/timeoutserver.py b/tests/testutils/timeoutserver.py new file mode 100644 index 000000000..3cc4aed18 --- /dev/null +++ b/tests/testutils/timeoutserver.py @@ -0,0 +1,236 @@ +import string +import pytest +import subprocess +import os +import shutil +import signal +from collections import namedtuple +from concurrent import futures + +from contextlib import contextmanager +from multiprocessing import Process, Queue +import grpc +import pytest_cov + +from buildstream import _yaml +from buildstream._artifactcache.cascache import CASCache +from buildstream._artifactcache import casserver +from buildstream._exceptions import CASError +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from buildstream._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc +from buildstream._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc + + +class TimeoutCasCache(CASCache): + pass + + +# ArtifactShare() +# +# Abstract class providing scaffolding for +# generating data to be used with various sources +# +# Args: +# directory (str): The base temp directory for the test +# total_space (int): Mock total disk space on artifact server +# free_space (int): Mock free disk space on artifact server +# +class TimeoutArtifactShare(): + + def __init__(self, directory, *, total_space=None, free_space=None): + + # The working directory for the artifact share (in case it + # needs to do something outside of its backend's storage folder). + # + self.directory = os.path.abspath(directory) + + # The directory the actual repo will be stored in. + # + # Unless this gets more complicated, just use this directly + # in tests as a remote artifact push/pull configuration + # + self.repodir = os.path.join(self.directory, 'repo') + + os.makedirs(self.repodir) + + self.cas = CASCache(self.repodir) + + self.total_space = total_space + self.free_space = free_space + + q = Queue() + + self.process = Process(target=self.run, args=(q,)) + self.process.start() + + # Retrieve port from server subprocess + port = q.get(timeout=1) + + self.repo = 'http://localhost:{}'.format(port) + + # run(): + # + # Run the artifact server. + # + def run(self, q): + pytest_cov.embed.cleanup_on_sigterm() + + # Optionally mock statvfs + if self.total_space: + if self.free_space is None: + self.free_space = self.total_space + os.statvfs = self._mock_statvfs + + server = create_timeout_server(self.repodir, enable_push=True) + port = server.add_insecure_port('localhost:0') + + server.start() + + # Send port to parent + q.put(port) + + # Sleep until termination by signal + signal.pause() + + # has_object(): + # + # Checks whether the object is present in the share + # + # Args: + # digest (str): The object's digest + # + # Returns: + # (bool): True if the object exists in the share, otherwise false. + def has_object(self, digest): + + assert isinstance(digest, remote_execution_pb2.Digest) + + object_path = self.cas.objpath(digest) + + return os.path.exists(object_path) + + # has_artifact(): + # + # Checks whether the artifact is present in the share + # + # Args: + # project_name (str): The project name + # element_name (str): The element name + # cache_key (str): The cache key + # + # Returns: + # (str): artifact digest if the artifact exists in the share, otherwise None. + def has_artifact(self, project_name, element_name, cache_key): + + # NOTE: This should be kept in line with our + # artifact cache code, the below is the + # same alI can confidently go for creating an artifact reference + # + + # Replace path separator and chop off the .bst suffix + element_name = os.path.splitext(element_name.replace(os.sep, '-'))[0] + + valid_chars = string.digits + string.ascii_letters + '-._' + element_name = ''.join([ + x if x in valid_chars else '_' + for x in element_name + ]) + artifact_key = '{0}/{1}/{2}'.format(project_name, element_name, cache_key) + + try: + tree = self.cas.resolve_ref(artifact_key) + return tree + except CASError: + return None + + # close(): + # + # Remove the artifact share. + # + def close(self): + self.process.terminate() + self.process.join() + + shutil.rmtree(self.directory) + + def _mock_statvfs(self, path): + repo_size = 0 + for root, _, files in os.walk(self.repodir): + for filename in files: + repo_size += os.path.getsize(os.path.join(root, filename)) + + return statvfs_result(f_blocks=self.total_space, + f_bfree=self.free_space - repo_size, + f_bavail=self.free_space - repo_size, + f_bsize=1) + + +# create_artifact_share() +# +# Create an ArtifactShare for use in a test case +# +@contextmanager +def create_timeout_artifact_share(directory, *, total_space=None, free_space=None): + share = TimeoutArtifactShare(directory, total_space=total_space, free_space=free_space) + try: + yield share + finally: + share.close() + + +statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail') + + +# create_timeout_server(): +# +# Create gRPC CAS artifact server as specified in the Remote Execution API. +# +# Args: +# repo (str): Path to CAS repository +# enable_push (bool): Whether to allow blob uploads and artifact updates +# +def create_timeout_server(repo, *, enable_push): + cas = TimeoutCasCache(os.path.abspath(repo)) + + # Use max_workers default from Python 3.5+ + max_workers = (os.cpu_count() or 1) * 5 + server = grpc.server(futures.ThreadPoolExecutor(max_workers)) + + bytestream_pb2_grpc.add_ByteStreamServicer_to_server( + _ByteStreamServicer(cas, enable_push=enable_push), server) + + remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) + + remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( + _CapabilitiesServicer(), server) + + buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( + _ReferenceStorageServicer(cas, enable_push=enable_push), server) + + return server + + +class _ByteStreamServicer(casserver._ByteStreamServicer): + pass + + +class _ContentAddressableStorageServicer(casserver._ContentAddressableStorageServicer): + + def __init__(self, cas, *, enable_push): + self.__read_count = 0 + super().__init__(cas=cas, enable_push=enable_push) + + def BatchReadBlobs(self, request, context): + # self.__read_count += 1 + # import time + # time.sleep(5) + return super().BatchReadBlobs(request, context) + + +class _CapabilitiesServicer(casserver._CapabilitiesServicer): + pass + + +class _ReferenceStorageServicer(casserver._ReferenceStorageServicer): + pass |