diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-10-15 17:42:07 +0100 |
---|---|---|
committer | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-11-12 11:19:00 +0000 |
commit | e8ddfe1d755f0093305eb3b98d630db9ffaeca81 (patch) | |
tree | db01707568410ed13b73f3d437b8f72b282c73ae | |
parent | 7c6d27b5dcfd4434a759febeae9195995f44a304 (diff) | |
download | buildstream-e8ddfe1d755f0093305eb3b98d630db9ffaeca81.tar.gz |
casserver.py: Move CASCache API to a smaller, local class
-rw-r--r-- | src/buildstream/_cas/casserver.py | 225 |
1 files changed, 198 insertions, 27 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index c0c62b033..4f0763922 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -19,6 +19,7 @@ from concurrent import futures from enum import Enum +from typing import Set import contextlib import logging import os @@ -41,15 +42,178 @@ from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc from .. import utils -from .._exceptions import CASError, CASCacheError -from .cascache import CASCache # The default limit for gRPC messages is 4 MiB. # Limit payload to 1 MiB to leave sufficient headroom for metadata. _MAX_PAYLOAD_BYTES = 1024 * 1024 +# CASCache: +# +# A slimmed down version of `buildstream._cas.cascache.CASCache` - +# exposes exactly the bits of interface we need to update objects on +# access. +# +# Note: This class *is* somewhat specialized and doesn't exactly do +# what `buildstream._cas.cascache.CASCache` does anymore. +# +# Ideally this should be supported by buildbox-casd in the future. +# +class CASCache: + def __init__(self, root: str): + self.root = root + self.casdir = os.path.join(root, "cas") + self.tmpdir = os.path.join(root, "tmp") + + # ref_path(): + # + # Get the path to a digest's file. + # + # Args: + # ref - The ref of the digest. + # + # Returns: + # str - The path to the digest's file. + # + def ref_path(self, ref: str) -> str: + return os.path.join(self.casdir, 'refs', 'heads', ref) + + # object_path(): + # + # Get the path to an object's file. + # + # Args: + # digest - The digest of the object. + # + # Returns: + # str - The path to the object's file. + # + def object_path(self, digest) -> str: + return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:]) + + # remove_ref(): + # + # Remove a digest file. + # + # Args: + # ref - The ref of the digest. + # + # Raises: + # FileNotFoundError - If the ref doesn't exist. + def remove_ref(self, ref: str): + basedir = os.path.join(self.casdir, 'refs', 'heads') + + os.unlink(self.ref_path(ref)) + + # Now remove any leading directories + components = list(os.path.split(ref)) + while components: + components.pop() + refdir = os.path.join(basedir, *components) + + # Break out once we reach the base + if refdir == basedir: + break + + try: + os.rmdir(refdir) + except FileNotFoundError: + # The parent directory did not exist, but it's + # parent directory might still be ready to prune + pass + except OSError as e: + if e.errno == errno.ENOTEMPTY: + # The parent directory was not empty, so we + # cannot prune directories beyond this point + break + raise + + # set_ref(): + # + # Create or update a ref with a new digest. + # + # Args: + # ref - The ref of the digest. + # tree - The digest to write. + # + def set_ref(self, ref: str, tree): + ref_path = self.ref_path(ref) + + os.makedirs(os.path.dirname(ref_path), exist_ok=True) + with utils.save_file_atomic(ref_path, 'wb', tempdir=self.tmpdir) as f: + f.write(tree.SerializeToString()) + + # resolve_ref(): + # + # Read a digest given its ref. + # + # Args: + # ref - The ref of the digest. + # + # Returns: + # remote_execution-pb2.Digest - The digest. + # + # Raises: + # FileNotFoundError - If the ref doesn't exist. + def resolve_ref(self, ref: str): + digest = remote_execution_pb2.Digest() + with open(self.ref_path(ref), 'rb') as f: + digest.ParseFromString(f.read()) + return digest + + # resolve_digest(): + # + # Read the directory corresponding to a digest. + # + # Args: + # digest - The digest corresponding to a directory. + # + # Returns: + # remote_execution_pb2.Directory - The directory. + # + # Raises: + # FileNotFoundError - If the digest object doesn't exist. + def resolve_digest(self, digest): + directory = remote_execution_pb2.Directory() + with open(self.object_path(digest), 'rb') as f: + directory.ParseFromString(f.read()) + return directory + + # update_tree_mtime(): + # + # Update the mtimes of all files in a tree. + # + # Args: + # tree - The digest of the tree to update. + # + # Raises: + # FileNotFoundError - If any of the tree's objects don't exist. + def update_tree_mtime(self, tree): + visited = set() # type: Set[str] + os.utime(self.object_path(tree)) + + def update_directory_node(node): + try: + if node.hash in visited: + return + except AttributeError: + raise Exception(type(node)) + + os.utime(self.object_path(node)) + visited.add(node.hash) + + directory = self.resolve_digest(node) + for filenode in directory.files: # pylint: disable=no-member + os.utime(self.object_path(filenode.digest)) + for dirnode in directory.directories: # pylint: disable=no-member + update_directory_node(dirnode.digest) + + # directory = self.resolve_digest(tree) + # update_directory_node(directory) + update_directory_node(tree) + + # LogLevel(): # # Represents the buildbox-casd log level. @@ -211,13 +375,13 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA handler.setLevel(LogLevel.get_logging_equivalent(log_level)) logger.addHandler(handler) - cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) cas_runner = CASdRunner(os.path.abspath(repo), cache_quota=quota) cas_runner.start_casd() + cas_cache = CASCache(os.path.abspath(repo)) try: - artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') - sourcedir = os.path.join(os.path.abspath(repo), 'source_protos') + root = os.path.abspath(repo) + sourcedir = os.path.join(root, 'source_protos') # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 @@ -234,10 +398,10 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA _CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(cas, enable_push=enable_push), server) + _ReferenceStorageServicer(cas, cas_cache, enable_push=enable_push), server) artifact_pb2_grpc.add_ArtifactServiceServicer_to_server( - _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server) + _ArtifactServicer(cas, root, cas_cache, update_cas=not index_only), server) source_pb2_grpc.add_SourceServiceServicer_to_server( _SourceServicer(sourcedir), server) @@ -564,9 +728,10 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): - def __init__(self, cas, *, enable_push): + def __init__(self, cas, cas_cache, *, enable_push): super().__init__() self.cas = cas + self.cas_cache = cas_cache self.enable_push = enable_push self.logger = logging.getLogger("casserver") @@ -575,17 +740,17 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): response = buildstream_pb2.GetReferenceResponse() try: - tree = self.cas.resolve_ref(request.key, update_mtime=True) + tree = self.cas_cache.resolve_ref(request.key) try: - self.cas.update_tree_mtime(tree) + self.cas_cache.update_tree_mtime(tree) except FileNotFoundError: - self.cas.remove(request.key) + self.cas_cache.remove_ref(request.key) context.set_code(grpc.StatusCode.NOT_FOUND) return response response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes - except CASError: + except FileNotFoundError: context.set_code(grpc.StatusCode.NOT_FOUND) return response @@ -599,7 +764,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): return response for key in request.keys: - self.cas.set_ref(key, request.digest) + self.cas_cache.set_ref(key, request.digest) return response @@ -614,10 +779,11 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): - def __init__(self, cas, artifactdir, *, update_cas=True): + def __init__(self, cas, root, cas_cache, *, update_cas=True): super().__init__() self.cas = cas - self.artifactdir = artifactdir + self.cas_cache = cas_cache + self.artifactdir = os.path.join(root, 'artifacts', 'refs') self.update_cas = update_cas os.makedirs(artifactdir, exist_ok=True) self.logger = logging.getLogger("casserver") @@ -651,20 +817,20 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): try: if str(artifact.files): - self.cas.update_tree_mtime(artifact.files) + self.cas_cache.update_tree_mtime(artifact.files) if str(artifact.buildtree): # buildtrees might not be there try: - self.cas.update_tree_mtime(artifact.buildtree) + self.cas_cache.update_tree_mtime(artifact.buildtree) except FileNotFoundError: pass if str(artifact.public_data): - os.utime(self.cas.objpath(artifact.public_data)) + os.utime(self.cas_cache.object_path(artifact.public_data)) for log_file in artifact.logs: - os.utime(self.cas.objpath(log_file.digest)) + os.utime(self.cas_cache.object_path(log_file.digest)) except FileNotFoundError: os.unlink(artifact_path) @@ -708,20 +874,25 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): def _check_directory(self, name, digest, context): try: - directory = remote_execution_pb2.Directory() - with open(self.cas.objpath(digest), 'rb') as f: - directory.ParseFromString(f.read()) + self.cas_cache.resolve_digest(digest) except FileNotFoundError: - self.logger.warning("Artifact %s specified but no files found (%s)", name, self.cas.objpath(digest)) - context.abort(grpc.StatusCode.FAILED_PRECONDITION, - "Artifact {} specified but no files found".format(name)) + self.logger.warning( + "Artifact %s specified but no files found (%s)", + name, + self.cas_cache.object_path(digest)) + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, + "Artifact {} specified but no files found".format(name)) except DecodeError: - self.logger.warning("Artifact %s specified but directory not found (%s)", name, self.cas.objpath(digest)) + self.logger.warning( + "Artifact %s specified but directory not found (%s)", + name, + self.cas_cache.object_path(digest)) context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but directory not found".format(name)) def _check_file(self, name, digest, context): - if not os.path.exists(self.cas.objpath(digest)): + if not os.path.exists(self.cas_cache.object_path(digest)): context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but not found".format(name)) |