summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2019-10-15 17:42:07 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2019-11-12 11:19:00 +0000
commite8ddfe1d755f0093305eb3b98d630db9ffaeca81 (patch)
treedb01707568410ed13b73f3d437b8f72b282c73ae
parent7c6d27b5dcfd4434a759febeae9195995f44a304 (diff)
downloadbuildstream-e8ddfe1d755f0093305eb3b98d630db9ffaeca81.tar.gz
casserver.py: Move CASCache API to a smaller, local class
-rw-r--r--src/buildstream/_cas/casserver.py225
1 files changed, 198 insertions, 27 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index c0c62b033..4f0763922 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -19,6 +19,7 @@
from concurrent import futures
from enum import Enum
+from typing import Set
import contextlib
import logging
import os
@@ -41,15 +42,178 @@ from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
from .. import utils
-from .._exceptions import CASError, CASCacheError
-from .cascache import CASCache
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
_MAX_PAYLOAD_BYTES = 1024 * 1024
+# CASCache:
+#
+# A slimmed down version of `buildstream._cas.cascache.CASCache` -
+# exposes exactly the bits of interface we need to update objects on
+# access.
+#
+# Note: This class *is* somewhat specialized and doesn't exactly do
+# what `buildstream._cas.cascache.CASCache` does anymore.
+#
+# Ideally this should be supported by buildbox-casd in the future.
+#
+class CASCache:
+ def __init__(self, root: str):
+ self.root = root
+ self.casdir = os.path.join(root, "cas")
+ self.tmpdir = os.path.join(root, "tmp")
+
+ # ref_path():
+ #
+ # Get the path to a digest's file.
+ #
+ # Args:
+ # ref - The ref of the digest.
+ #
+ # Returns:
+ # str - The path to the digest's file.
+ #
+ def ref_path(self, ref: str) -> str:
+ return os.path.join(self.casdir, 'refs', 'heads', ref)
+
+ # object_path():
+ #
+ # Get the path to an object's file.
+ #
+ # Args:
+ # digest - The digest of the object.
+ #
+ # Returns:
+ # str - The path to the object's file.
+ #
+ def object_path(self, digest) -> str:
+ return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:])
+
+ # remove_ref():
+ #
+ # Remove a digest file.
+ #
+ # Args:
+ # ref - The ref of the digest.
+ #
+ # Raises:
+ # FileNotFoundError - If the ref doesn't exist.
+ def remove_ref(self, ref: str):
+ basedir = os.path.join(self.casdir, 'refs', 'heads')
+
+ os.unlink(self.ref_path(ref))
+
+ # Now remove any leading directories
+ components = list(os.path.split(ref))
+ while components:
+ components.pop()
+ refdir = os.path.join(basedir, *components)
+
+ # Break out once we reach the base
+ if refdir == basedir:
+ break
+
+ try:
+ os.rmdir(refdir)
+ except FileNotFoundError:
+ # The parent directory did not exist, but it's
+ # parent directory might still be ready to prune
+ pass
+ except OSError as e:
+ if e.errno == errno.ENOTEMPTY:
+ # The parent directory was not empty, so we
+ # cannot prune directories beyond this point
+ break
+ raise
+
+ # set_ref():
+ #
+ # Create or update a ref with a new digest.
+ #
+ # Args:
+ # ref - The ref of the digest.
+ # tree - The digest to write.
+ #
+ def set_ref(self, ref: str, tree):
+ ref_path = self.ref_path(ref)
+
+ os.makedirs(os.path.dirname(ref_path), exist_ok=True)
+ with utils.save_file_atomic(ref_path, 'wb', tempdir=self.tmpdir) as f:
+ f.write(tree.SerializeToString())
+
+ # resolve_ref():
+ #
+ # Read a digest given its ref.
+ #
+ # Args:
+ # ref - The ref of the digest.
+ #
+ # Returns:
+ # remote_execution-pb2.Digest - The digest.
+ #
+ # Raises:
+ # FileNotFoundError - If the ref doesn't exist.
+ def resolve_ref(self, ref: str):
+ digest = remote_execution_pb2.Digest()
+ with open(self.ref_path(ref), 'rb') as f:
+ digest.ParseFromString(f.read())
+ return digest
+
+ # resolve_digest():
+ #
+ # Read the directory corresponding to a digest.
+ #
+ # Args:
+ # digest - The digest corresponding to a directory.
+ #
+ # Returns:
+ # remote_execution_pb2.Directory - The directory.
+ #
+ # Raises:
+ # FileNotFoundError - If the digest object doesn't exist.
+ def resolve_digest(self, digest):
+ directory = remote_execution_pb2.Directory()
+ with open(self.object_path(digest), 'rb') as f:
+ directory.ParseFromString(f.read())
+ return directory
+
+ # update_tree_mtime():
+ #
+ # Update the mtimes of all files in a tree.
+ #
+ # Args:
+ # tree - The digest of the tree to update.
+ #
+ # Raises:
+ # FileNotFoundError - If any of the tree's objects don't exist.
+ def update_tree_mtime(self, tree):
+ visited = set() # type: Set[str]
+ os.utime(self.object_path(tree))
+
+ def update_directory_node(node):
+ try:
+ if node.hash in visited:
+ return
+ except AttributeError:
+ raise Exception(type(node))
+
+ os.utime(self.object_path(node))
+ visited.add(node.hash)
+
+ directory = self.resolve_digest(node)
+ for filenode in directory.files: # pylint: disable=no-member
+ os.utime(self.object_path(filenode.digest))
+ for dirnode in directory.directories: # pylint: disable=no-member
+ update_directory_node(dirnode.digest)
+
+ # directory = self.resolve_digest(tree)
+ # update_directory_node(directory)
+ update_directory_node(tree)
+
+
# LogLevel():
#
# Represents the buildbox-casd log level.
@@ -211,13 +375,13 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
handler.setLevel(LogLevel.get_logging_equivalent(log_level))
logger.addHandler(handler)
- cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
cas_runner = CASdRunner(os.path.abspath(repo), cache_quota=quota)
cas_runner.start_casd()
+ cas_cache = CASCache(os.path.abspath(repo))
try:
- artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
- sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
+ root = os.path.abspath(repo)
+ sourcedir = os.path.join(root, 'source_protos')
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
@@ -234,10 +398,10 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
_CapabilitiesServicer(), server)
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
- _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+ _ReferenceStorageServicer(cas, cas_cache, enable_push=enable_push), server)
artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
- _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server)
+ _ArtifactServicer(cas, root, cas_cache, update_cas=not index_only), server)
source_pb2_grpc.add_SourceServiceServicer_to_server(
_SourceServicer(sourcedir), server)
@@ -564,9 +728,10 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
- def __init__(self, cas, *, enable_push):
+ def __init__(self, cas, cas_cache, *, enable_push):
super().__init__()
self.cas = cas
+ self.cas_cache = cas_cache
self.enable_push = enable_push
self.logger = logging.getLogger("casserver")
@@ -575,17 +740,17 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
response = buildstream_pb2.GetReferenceResponse()
try:
- tree = self.cas.resolve_ref(request.key, update_mtime=True)
+ tree = self.cas_cache.resolve_ref(request.key)
try:
- self.cas.update_tree_mtime(tree)
+ self.cas_cache.update_tree_mtime(tree)
except FileNotFoundError:
- self.cas.remove(request.key)
+ self.cas_cache.remove_ref(request.key)
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
response.digest.hash = tree.hash
response.digest.size_bytes = tree.size_bytes
- except CASError:
+ except FileNotFoundError:
context.set_code(grpc.StatusCode.NOT_FOUND)
return response
@@ -599,7 +764,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
return response
for key in request.keys:
- self.cas.set_ref(key, request.digest)
+ self.cas_cache.set_ref(key, request.digest)
return response
@@ -614,10 +779,11 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
- def __init__(self, cas, artifactdir, *, update_cas=True):
+ def __init__(self, cas, root, cas_cache, *, update_cas=True):
super().__init__()
self.cas = cas
- self.artifactdir = artifactdir
+ self.cas_cache = cas_cache
+ self.artifactdir = os.path.join(root, 'artifacts', 'refs')
self.update_cas = update_cas
os.makedirs(artifactdir, exist_ok=True)
self.logger = logging.getLogger("casserver")
@@ -651,20 +817,20 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
try:
if str(artifact.files):
- self.cas.update_tree_mtime(artifact.files)
+ self.cas_cache.update_tree_mtime(artifact.files)
if str(artifact.buildtree):
# buildtrees might not be there
try:
- self.cas.update_tree_mtime(artifact.buildtree)
+ self.cas_cache.update_tree_mtime(artifact.buildtree)
except FileNotFoundError:
pass
if str(artifact.public_data):
- os.utime(self.cas.objpath(artifact.public_data))
+ os.utime(self.cas_cache.object_path(artifact.public_data))
for log_file in artifact.logs:
- os.utime(self.cas.objpath(log_file.digest))
+ os.utime(self.cas_cache.object_path(log_file.digest))
except FileNotFoundError:
os.unlink(artifact_path)
@@ -708,20 +874,25 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
def _check_directory(self, name, digest, context):
try:
- directory = remote_execution_pb2.Directory()
- with open(self.cas.objpath(digest), 'rb') as f:
- directory.ParseFromString(f.read())
+ self.cas_cache.resolve_digest(digest)
except FileNotFoundError:
- self.logger.warning("Artifact %s specified but no files found (%s)", name, self.cas.objpath(digest))
- context.abort(grpc.StatusCode.FAILED_PRECONDITION,
- "Artifact {} specified but no files found".format(name))
+ self.logger.warning(
+ "Artifact %s specified but no files found (%s)",
+ name,
+ self.cas_cache.object_path(digest))
+ context.abort(
+ grpc.StatusCode.FAILED_PRECONDITION,
+ "Artifact {} specified but no files found".format(name))
except DecodeError:
- self.logger.warning("Artifact %s specified but directory not found (%s)", name, self.cas.objpath(digest))
+ self.logger.warning(
+ "Artifact %s specified but directory not found (%s)",
+ name,
+ self.cas_cache.object_path(digest))
context.abort(grpc.StatusCode.FAILED_PRECONDITION,
"Artifact {} specified but directory not found".format(name))
def _check_file(self, name, digest, context):
- if not os.path.exists(self.cas.objpath(digest)):
+ if not os.path.exists(self.cas_cache.object_path(digest)):
context.abort(grpc.StatusCode.FAILED_PRECONDITION,
"Artifact {} specified but not found".format(name))