summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFinn <finn.ball@codethink.co.uk>2018-11-28 13:50:43 +0000
committerFinn <finn.ball@codethink.co.uk>2018-12-07 15:05:25 +0000
commit4bebfa0474bb49346f83e66f7688311ca4cbc9a8 (patch)
tree89b65d280cab4f8b1052740bb4dbb0a5bd3baebf
parent6e3e34f73c68e36a3eaab6b6dbb8d3b7972561f7 (diff)
downloadbuildstream-finn/cas-error-timeouts.tar.gz
-rw-r--r--tests/artifactcache/pull.py103
-rw-r--r--tests/testutils/__init__.py1
-rw-r--r--tests/testutils/timeoutserver.py236
3 files changed, 339 insertions, 1 deletions
diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 4c332bf36..bec21d1cc 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -10,7 +10,7 @@ from buildstream._context import Context
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
-from tests.testutils import cli, create_artifact_share
+from tests.testutils import cli, create_artifact_share, create_timeout_artifact_share
# Project directory
@@ -327,3 +327,104 @@ def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest
queue.put((directory_digest.hash, directory_digest.size_bytes))
else:
queue.put("No remote configured")
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_pully_pull(cli, tmpdir, datafiles):
+ project_dir = str(datafiles)
+
+ # Set up an artifact cache.
+ with create_timeout_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+ # Configure artifact share
+ artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+ user_config_file = str(tmpdir.join('buildstream.conf'))
+ user_config = {
+ 'scheduler': {
+ 'pushers': 1
+ },
+ 'artifacts': {
+ 'url': share.repo,
+ 'push': True,
+ }
+ }
+
+ # Write down the user configuration file
+ _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+ # Ensure CLI calls will use it
+ cli.configure(user_config)
+
+ # First build the project with the artifact cache configured
+ result = cli.run(project=project_dir, args=['build', 'target.bst'])
+ result.assert_success()
+
+ # Assert that we are now cached locally
+ assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+ # Assert that we shared/pushed the cached artifact
+ element_key = cli.get_element_key(project_dir, 'target.bst')
+ assert share.has_artifact('test', 'target.bst', element_key)
+
+ # Delete the artifact locally
+ cli.remove_artifact_from_cache(project_dir, 'target.bst')
+
+ # Assert that we are not cached locally anymore
+ assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
+
+ # Fake minimal context
+ context = Context()
+ context.load(config=user_config_file)
+ context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+ context.set_message_handler(message_handler)
+
+ # Load the project and CAS cache
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+ cas = context.artifactcache
+
+ # Assert that the element's artifact is **not** cached
+ element = project.load_elements(['target.bst'])[0]
+ element_key = cli.get_element_key(project_dir, 'target.bst')
+ assert not cas.contains(element, element_key)
+
+ queue = multiprocessing.Queue()
+ # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+ process = multiprocessing.Process(target=_test_pull,
+ args=(user_config_file, project_dir, artifact_dir,
+ 'target.bst', element_key, queue))
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ process.start()
+
+ error = queue.get()
+ process.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(process.pid)
+ raise
+
+ assert not error
+ assert cas.contains(element, element_key)
+
+ # Fake minimal context
+ context = Context()
+ context.load(config=user_config_file)
+ context.artifactdir = artifact_dir
+ context.set_message_handler(message_handler)
+
+ # Load the project manually
+ project = Project(project_dir, context)
+ project.ensure_fully_loaded()
+
+ # Create a local CAS cache handle
+ cas = context.artifactcache
+
+ # Manually setup the CAS remote
+ cas.setup_remotes(use_config=True)
+
+ if cas.has_push_remotes():
+ # Pull the artifact using the Tree object
+ directory_digest = cas.pull_tree(project, artifact_digest)
+ queue.put((directory_digest.hash, directory_digest.size_bytes))
+ else:
+ queue.put("No remote configured")
diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py
index eb7211ea8..6b6418309 100644
--- a/tests/testutils/__init__.py
+++ b/tests/testutils/__init__.py
@@ -30,3 +30,4 @@ from .element_generators import create_element_size, update_element_size
from .junction import generate_junction
from .runner_integration import wait_for_cache_granularity
from .python_repo import setup_pypi_repo
+from .timeoutserver import create_timeout_artifact_share
diff --git a/tests/testutils/timeoutserver.py b/tests/testutils/timeoutserver.py
new file mode 100644
index 000000000..3cc4aed18
--- /dev/null
+++ b/tests/testutils/timeoutserver.py
@@ -0,0 +1,236 @@
+import string
+import pytest
+import subprocess
+import os
+import shutil
+import signal
+from collections import namedtuple
+from concurrent import futures
+
+from contextlib import contextmanager
+from multiprocessing import Process, Queue
+import grpc
+import pytest_cov
+
+from buildstream import _yaml
+from buildstream._artifactcache.cascache import CASCache
+from buildstream._artifactcache import casserver
+from buildstream._exceptions import CASError
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from buildstream._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
+from buildstream._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
+
+
+class TimeoutCasCache(CASCache):
+ pass
+
+
+# ArtifactShare()
+#
+# Abstract class providing scaffolding for
+# generating data to be used with various sources
+#
+# Args:
+# directory (str): The base temp directory for the test
+# total_space (int): Mock total disk space on artifact server
+# free_space (int): Mock free disk space on artifact server
+#
+class TimeoutArtifactShare():
+
+ def __init__(self, directory, *, total_space=None, free_space=None):
+
+ # The working directory for the artifact share (in case it
+ # needs to do something outside of its backend's storage folder).
+ #
+ self.directory = os.path.abspath(directory)
+
+ # The directory the actual repo will be stored in.
+ #
+ # Unless this gets more complicated, just use this directly
+ # in tests as a remote artifact push/pull configuration
+ #
+ self.repodir = os.path.join(self.directory, 'repo')
+
+ os.makedirs(self.repodir)
+
+ self.cas = CASCache(self.repodir)
+
+ self.total_space = total_space
+ self.free_space = free_space
+
+ q = Queue()
+
+ self.process = Process(target=self.run, args=(q,))
+ self.process.start()
+
+ # Retrieve port from server subprocess
+ port = q.get(timeout=1)
+
+ self.repo = 'http://localhost:{}'.format(port)
+
+ # run():
+ #
+ # Run the artifact server.
+ #
+ def run(self, q):
+ pytest_cov.embed.cleanup_on_sigterm()
+
+ # Optionally mock statvfs
+ if self.total_space:
+ if self.free_space is None:
+ self.free_space = self.total_space
+ os.statvfs = self._mock_statvfs
+
+ server = create_timeout_server(self.repodir, enable_push=True)
+ port = server.add_insecure_port('localhost:0')
+
+ server.start()
+
+ # Send port to parent
+ q.put(port)
+
+ # Sleep until termination by signal
+ signal.pause()
+
+ # has_object():
+ #
+ # Checks whether the object is present in the share
+ #
+ # Args:
+ # digest (str): The object's digest
+ #
+ # Returns:
+ # (bool): True if the object exists in the share, otherwise false.
+ def has_object(self, digest):
+
+ assert isinstance(digest, remote_execution_pb2.Digest)
+
+ object_path = self.cas.objpath(digest)
+
+ return os.path.exists(object_path)
+
+ # has_artifact():
+ #
+ # Checks whether the artifact is present in the share
+ #
+ # Args:
+ # project_name (str): The project name
+ # element_name (str): The element name
+ # cache_key (str): The cache key
+ #
+ # Returns:
+ # (str): artifact digest if the artifact exists in the share, otherwise None.
+ def has_artifact(self, project_name, element_name, cache_key):
+
+ # NOTE: This should be kept in line with our
+ # artifact cache code, the below is the
+ # same alI can confidently go for creating an artifact reference
+ #
+
+ # Replace path separator and chop off the .bst suffix
+ element_name = os.path.splitext(element_name.replace(os.sep, '-'))[0]
+
+ valid_chars = string.digits + string.ascii_letters + '-._'
+ element_name = ''.join([
+ x if x in valid_chars else '_'
+ for x in element_name
+ ])
+ artifact_key = '{0}/{1}/{2}'.format(project_name, element_name, cache_key)
+
+ try:
+ tree = self.cas.resolve_ref(artifact_key)
+ return tree
+ except CASError:
+ return None
+
+ # close():
+ #
+ # Remove the artifact share.
+ #
+ def close(self):
+ self.process.terminate()
+ self.process.join()
+
+ shutil.rmtree(self.directory)
+
+ def _mock_statvfs(self, path):
+ repo_size = 0
+ for root, _, files in os.walk(self.repodir):
+ for filename in files:
+ repo_size += os.path.getsize(os.path.join(root, filename))
+
+ return statvfs_result(f_blocks=self.total_space,
+ f_bfree=self.free_space - repo_size,
+ f_bavail=self.free_space - repo_size,
+ f_bsize=1)
+
+
+# create_artifact_share()
+#
+# Create an ArtifactShare for use in a test case
+#
+@contextmanager
+def create_timeout_artifact_share(directory, *, total_space=None, free_space=None):
+ share = TimeoutArtifactShare(directory, total_space=total_space, free_space=free_space)
+ try:
+ yield share
+ finally:
+ share.close()
+
+
+statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail')
+
+
+# create_timeout_server():
+#
+# Create gRPC CAS artifact server as specified in the Remote Execution API.
+#
+# Args:
+# repo (str): Path to CAS repository
+# enable_push (bool): Whether to allow blob uploads and artifact updates
+#
+def create_timeout_server(repo, *, enable_push):
+ cas = TimeoutCasCache(os.path.abspath(repo))
+
+ # Use max_workers default from Python 3.5+
+ max_workers = (os.cpu_count() or 1) * 5
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers))
+
+ bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
+ _ByteStreamServicer(cas, enable_push=enable_push), server)
+
+ remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
+
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
+ _CapabilitiesServicer(), server)
+
+ buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+
+ return server
+
+
+class _ByteStreamServicer(casserver._ByteStreamServicer):
+ pass
+
+
+class _ContentAddressableStorageServicer(casserver._ContentAddressableStorageServicer):
+
+ def __init__(self, cas, *, enable_push):
+ self.__read_count = 0
+ super().__init__(cas=cas, enable_push=enable_push)
+
+ def BatchReadBlobs(self, request, context):
+ # self.__read_count += 1
+ # import time
+ # time.sleep(5)
+ return super().BatchReadBlobs(request, context)
+
+
+class _CapabilitiesServicer(casserver._CapabilitiesServicer):
+ pass
+
+
+class _ReferenceStorageServicer(casserver._ReferenceStorageServicer):
+ pass