import os import shutil import signal import sys from collections import namedtuple from contextlib import ExitStack, contextmanager from concurrent import futures from multiprocessing import Process, Queue import grpc from buildstream._cas import CASCache from buildstream._cas.casserver import create_server from buildstream._exceptions import CASError from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildstream._protos.buildstream.v2 import artifact_pb2, source_pb2 class BaseArtifactShare: def __init__(self): q = Queue() self.process = Process(target=self.run, args=(q,)) self.process.start() # Retrieve port from server subprocess port = q.get() if port is None: raise Exception("Error occurred when starting artifact server.") self.repo = "http://localhost:{}".format(port) # run(): # # Run the artifact server. # def run(self, q): with ExitStack() as stack: try: # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception, # properly executing cleanup code in `finally` clauses and context managers. # This is required to terminate buildbox-casd on SIGTERM. signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0)) try: from pytest_cov.embed import cleanup_on_sigterm except ImportError: pass else: cleanup_on_sigterm() server = stack.enter_context(self._create_server()) port = server.add_insecure_port("localhost:0") server.start() except Exception: q.put(None) raise # Send port to parent q.put(port) # Sleep until termination by signal signal.pause() # _create_server() # # Create the server that will be run in the process # def _create_server(self): raise NotImplementedError() # close(): # # Remove the artifact share. # def close(self): self.process.terminate() self.process.join() # DummyArtifactShare() # # A dummy artifact share without any capabilities # class DummyArtifactShare(BaseArtifactShare): @contextmanager def _create_server(self): max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) yield server # ArtifactShare() # # Abstract class providing scaffolding for # generating data to be used with various sources # # Args: # directory (str): The base temp directory for the test # cache_quota (int): Maximum amount of disk space to use # casd (bool): Allow write access via casd # enable_push (bool): Whether the share should allow pushes # class ArtifactShare(BaseArtifactShare): def __init__(self, directory, *, quota=None, casd=False, index_only=False): # The working directory for the artifact share (in case it # needs to do something outside of its backend's storage folder). # self.directory = os.path.abspath(directory) # The directory the actual repo will be stored in. # # Unless this gets more complicated, just use this directly # in tests as a remote artifact push/pull configuration # self.repodir = os.path.join(self.directory, "repo") os.makedirs(self.repodir) self.artifactdir = os.path.join(self.repodir, "artifacts", "refs") os.makedirs(self.artifactdir) self.sourcedir = os.path.join(self.repodir, "source_protos", "refs") os.makedirs(self.sourcedir) logdir = os.path.join(self.directory, "logs") if casd else None self.cas = CASCache(self.repodir, casd=casd, log_directory=logdir) self.quota = quota self.index_only = index_only super().__init__() def _create_server(self): return create_server(self.repodir, quota=self.quota, enable_push=True, index_only=self.index_only,) # has_object(): # # Checks whether the object is present in the share # # Args: # digest (str): The object's digest # # Returns: # (bool): True if the object exists in the share, otherwise false. def has_object(self, digest): assert isinstance(digest, remote_execution_pb2.Digest) object_path = self.cas.objpath(digest) return os.path.exists(object_path) def get_artifact_proto(self, artifact_name): artifact_proto = artifact_pb2.Artifact() artifact_path = os.path.join(self.artifactdir, artifact_name) try: with open(artifact_path, "rb") as f: artifact_proto.ParseFromString(f.read()) except FileNotFoundError: return None return artifact_proto def get_source_proto(self, source_name): source_proto = source_pb2.Source() source_path = os.path.join(self.sourcedir, source_name) try: with open(source_path, "rb") as f: source_proto.ParseFromString(f.read()) except FileNotFoundError: return None return source_proto def get_cas_files(self, artifact_proto): reachable = set() def reachable_dir(digest): self.cas._reachable_refs_dir(reachable, digest, update_mtime=False, check_exists=True) try: if str(artifact_proto.files): reachable_dir(artifact_proto.files) if str(artifact_proto.buildtree): reachable_dir(artifact_proto.buildtree) if str(artifact_proto.public_data): if not os.path.exists(self.cas.objpath(artifact_proto.public_data)): return None for log_file in artifact_proto.logs: if not os.path.exists(self.cas.objpath(log_file.digest)): return None return artifact_proto.files except CASError: return None except FileNotFoundError: return None # has_artifact(): # # Checks whether the artifact is present in the share # # Args: # artifact_name (str): The composed complete artifact name # # Returns: # (ArtifactProto): artifact digest if the artifact exists in the share, otherwise None. def get_artifact(self, artifact_name): artifact_proto = self.get_artifact_proto(artifact_name) if not artifact_proto: return None return self.get_cas_files(artifact_proto) # close(): # # Remove the artifact share. # def close(self): super().close() self.cas.release_resources() shutil.rmtree(self.directory) # create_artifact_share() # # Create an ArtifactShare for use in a test case # @contextmanager def create_artifact_share(directory, *, quota=None, casd=False): share = ArtifactShare(directory, quota=quota, casd=casd) try: yield share finally: share.close() @contextmanager def create_split_share(directory1, directory2, *, quota=None, casd=False): index = ArtifactShare(directory1, quota=quota, casd=casd, index_only=True) storage = ArtifactShare(directory2, quota=quota, casd=casd) try: yield index, storage finally: index.close() storage.close() # create_dummy_artifact_share() # # Create a dummy artifact share that doesn't have any capabilities # @contextmanager def create_dummy_artifact_share(): share = DummyArtifactShare() try: yield share finally: share.close() statvfs_result = namedtuple("statvfs_result", "f_blocks f_bfree f_bsize f_bavail") # Assert that a given artifact is in the share # def assert_shared(cli, share, project, element_name, *, project_name="test"): if not share.get_artifact(cli.get_artifact_name(project, project_name, element_name)): raise AssertionError( "Artifact share at {} does not contain the expected element {}".format(share.repo, element_name) ) # Assert that a given artifact is not in the share # def assert_not_shared(cli, share, project, element_name, *, project_name="test"): if share.get_artifact(cli.get_artifact_name(project, project_name, element_name)): raise AssertionError( "Artifact share at {} unexpectedly contains the element {}".format(share.repo, element_name) )