summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-07-01 16:40:13 +0200
committerbst-marge-bot <marge-bot@buildstream.build>2020-08-13 09:24:43 +0000
commit6e412d525ba7b9812e66244b6ad2176ab9a5dc32 (patch)
treeae5228e1fc60c94be8a901ea56b07ae857978dbc
parent0ef5d12f53edbb12bee188a060bfe0ec6e1fcb23 (diff)
downloadbuildstream-6e412d525ba7b9812e66244b6ad2176ab9a5dc32.tar.gz
casserver.py: Drop BuildStream Artifact and Source services
Replaced by Remote Asset API Fetch and Push services.
-rw-r--r--src/buildstream/_cas/casserver.py218
-rw-r--r--tests/artifactcache/artifactservice.py107
2 files changed, 0 insertions, 325 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index d52e07291..013fb07dd 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -27,7 +27,6 @@ import signal
import sys
import grpc
-from google.protobuf.message import DecodeError
import click
from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc
@@ -36,14 +35,9 @@ from .._protos.build.bazel.remote.execution.v2 import (
remote_execution_pb2_grpc,
)
from .._protos.google.bytestream import bytestream_pb2_grpc
-from .._protos.build.buildgrid import local_cas_pb2
from .._protos.buildstream.v2 import (
buildstream_pb2,
buildstream_pb2_grpc,
- artifact_pb2,
- artifact_pb2_grpc,
- source_pb2,
- source_pb2_grpc,
)
# Note: We'd ideally like to avoid imports from the core codebase as
@@ -117,7 +111,6 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le
try:
root = os.path.abspath(repo)
- sourcedir = os.path.join(root, "source_protos")
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
@@ -144,19 +137,6 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le
_ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server
)
- artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
- _ArtifactServicer(casd_channel, root, update_cas=not index_only), server
- )
-
- source_pb2_grpc.add_SourceServiceServicer_to_server(_SourceServicer(sourcedir), server)
-
- # Create up reference storage and artifact capabilities
- artifact_capabilities = buildstream_pb2.ArtifactCapabilities(allow_updates=enable_push)
- source_capabilities = buildstream_pb2.SourceCapabilities(allow_updates=enable_push)
- buildstream_pb2_grpc.add_CapabilitiesServicer_to_server(
- _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities), server
- )
-
yield server
finally:
@@ -443,201 +423,3 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
response.allow_updates = self.enable_push
return response
-
-
-class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
- def __init__(self, casd, root, *, update_cas=True):
- super().__init__()
- self.cas = casd.get_cas()
- self.local_cas = casd.get_local_cas()
- self.root = root
- self.artifactdir = os.path.join(root, "artifacts", "refs")
- self.update_cas = update_cas
- self.logger = logging.getLogger("buildstream._cas.casserver")
-
- # object_path():
- #
- # Get the path to an object's file.
- #
- # Args:
- # digest - The digest of the object.
- #
- # Returns:
- # str - The path to the object's file.
- #
- def object_path(self, digest) -> str:
- return os.path.join(self.root, "cas", "objects", digest.hash[:2], digest.hash[2:])
-
- # resolve_digest():
- #
- # Read the directory corresponding to a digest.
- #
- # Args:
- # digest - The digest corresponding to a directory.
- #
- # Returns:
- # remote_execution_pb2.Directory - The directory.
- #
- # Raises:
- # FileNotFoundError - If the digest object doesn't exist.
- def resolve_digest(self, digest):
- directory = remote_execution_pb2.Directory()
- with open(self.object_path(digest), "rb") as f:
- directory.ParseFromString(f.read())
- return directory
-
- def GetArtifact(self, request, context):
- self.logger.info("'%s'", request.cache_key)
- artifact_path = os.path.join(self.artifactdir, request.cache_key)
- if not os.path.exists(artifact_path):
- context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found")
-
- artifact = artifact_pb2.Artifact()
- with open(artifact_path, "rb") as f:
- artifact.ParseFromString(f.read())
-
- os.utime(artifact_path)
-
- # Artifact-only servers will not have blobs on their system,
- # so we can't reasonably update their mtimes. Instead, we exit
- # early, and let the CAS server deal with its blobs.
- #
- # FIXME: We could try to run FindMissingBlobs on the other
- # server. This is tricky to do from here, of course,
- # because we don't know who the other server is, but
- # the client could be smart about it - but this might
- # make things slower.
- #
- # It needs some more thought...
- if not self.update_cas:
- return artifact
-
- # Now update mtimes of files present.
- try:
-
- if str(artifact.files):
- request = local_cas_pb2.FetchTreeRequest()
- request.root_digest.CopyFrom(artifact.files)
- request.fetch_file_blobs = True
- self.local_cas.FetchTree(request)
-
- if str(artifact.buildtree):
- try:
- request = local_cas_pb2.FetchTreeRequest()
- request.root_digest.CopyFrom(artifact.buildtree)
- request.fetch_file_blobs = True
- self.local_cas.FetchTree(request)
- except grpc.RpcError as err:
- # buildtrees might not be there
- if err.code() != grpc.StatusCode.NOT_FOUND:
- raise
-
- if str(artifact.public_data):
- request = remote_execution_pb2.FindMissingBlobsRequest()
- d = request.blob_digests.add()
- d.CopyFrom(artifact.public_data)
- self.cas.FindMissingBlobs(request)
-
- request = remote_execution_pb2.FindMissingBlobsRequest()
- for log_file in artifact.logs:
- d = request.blob_digests.add()
- d.CopyFrom(log_file.digest)
- self.cas.FindMissingBlobs(request)
-
- except grpc.RpcError as err:
- if err.code() == grpc.StatusCode.NOT_FOUND:
- os.unlink(artifact_path)
- context.abort(grpc.StatusCode.NOT_FOUND, "Artifact files incomplete")
- else:
- context.abort(grpc.StatusCode.NOT_FOUND, "Artifact files not valid")
-
- return artifact
-
- def UpdateArtifact(self, request, context):
- self.logger.info("'%s' -> '%s'", request.artifact, request.cache_key)
- artifact = request.artifact
-
- if self.update_cas:
- # Check that the files specified are in the CAS
- if str(artifact.files):
- self._check_directory("files", artifact.files, context)
-
- # Unset protocol buffers don't evaluated to False but do return empty
- # strings, hence str()
- if str(artifact.public_data):
- self._check_file("public data", artifact.public_data, context)
-
- for log_file in artifact.logs:
- self._check_file("log digest", log_file.digest, context)
-
- # Add the artifact proto to the cas
- artifact_path = os.path.join(self.artifactdir, request.cache_key)
- os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
- with save_file_atomic(artifact_path, mode="wb") as f:
- f.write(artifact.SerializeToString())
-
- return artifact
-
- def _check_directory(self, name, digest, context):
- try:
- self.resolve_digest(digest)
- except FileNotFoundError:
- self.logger.warning("Artifact %s specified but no files found", name)
- context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but no files found".format(name))
- except DecodeError:
- self.logger.warning("Artifact %s specified but directory not found", name)
- context.abort(
- grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but directory not found".format(name)
- )
-
- def _check_file(self, name, digest, context):
- if not os.path.exists(self.object_path(digest)):
- context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but not found".format(name))
-
-
-class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer):
- def __init__(self, artifact_capabilities, source_capabilities):
- self.artifact_capabilities = artifact_capabilities
- self.source_capabilities = source_capabilities
-
- def GetCapabilities(self, request, context):
- response = buildstream_pb2.ServerCapabilities()
- response.artifact_capabilities.CopyFrom(self.artifact_capabilities)
- response.source_capabilities.CopyFrom(self.source_capabilities)
- return response
-
-
-class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
- def __init__(self, sourcedir):
- self.sourcedir = sourcedir
- self.logger = logging.getLogger("buildstream._cas.casserver")
-
- def GetSource(self, request, context):
- self.logger.info("'%s'", request.cache_key)
- try:
- source_proto = self._get_source(request.cache_key)
- except FileNotFoundError:
- context.abort(grpc.StatusCode.NOT_FOUND, "Source not found")
- except DecodeError:
- context.abort(grpc.StatusCode.NOT_FOUND, "Sources gives invalid directory")
-
- return source_proto
-
- def UpdateSource(self, request, context):
- self.logger.info("'%s' -> '%s'", request.source, request.cache_key)
- self._set_source(request.cache_key, request.source)
- return request.source
-
- def _get_source(self, cache_key):
- path = os.path.join(self.sourcedir, cache_key)
- source_proto = source_pb2.Source()
- with open(path, "r+b") as f:
- source_proto.ParseFromString(f.read())
- os.utime(path)
- return source_proto
-
- def _set_source(self, cache_key, source_proto):
- path = os.path.join(self.sourcedir, cache_key)
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with save_file_atomic(path, "w+b") as f:
- f.write(source_proto.SerializeToString())
diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py
deleted file mode 100644
index c640665a3..000000000
--- a/tests/artifactcache/artifactservice.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Copyright (C) 2019 Bloomberg Finance LP
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-# Authors: Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
-#
-import os
-from urllib.parse import urlparse
-
-import grpc
-import pytest
-
-from buildstream._protos.buildstream.v2.artifact_pb2 import Artifact, GetArtifactRequest, UpdateArtifactRequest
-from buildstream._protos.buildstream.v2.artifact_pb2_grpc import ArtifactServiceStub
-from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as re_pb2
-from buildstream import utils
-
-from tests.testutils.artifactshare import create_artifact_share
-
-
-def test_artifact_get_not_found(tmpdir):
- sharedir = os.path.join(str(tmpdir), "share")
- with create_artifact_share(sharedir) as share:
- # set up artifact service stub
- url = urlparse(share.repo)
- with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel:
- artifact_stub = ArtifactServiceStub(channel)
-
- # Run GetArtifact and check it throws a not found error
- request = GetArtifactRequest()
- request.cache_key = "@artifact/something/not_there"
- try:
- artifact_stub.GetArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.NOT_FOUND
- assert e.details() == "Artifact proto not found"
- else:
- assert False
-
-
-# Successfully getting the artifact
-@pytest.mark.parametrize("files", ["present", "absent", "invalid"])
-def test_update_artifact(tmpdir, files):
- sharedir = os.path.join(str(tmpdir), "share")
- with create_artifact_share(sharedir, casd=True) as share:
- # put files object
- if files == "present":
- directory = re_pb2.Directory()
- digest = share.cas.add_object(buffer=directory.SerializeToString())
- elif files == "invalid":
- digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
- elif files == "absent":
- digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
-
- url = urlparse(share.repo)
-
- with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel:
- artifact_stub = ArtifactServiceStub(channel)
-
- # initialise an artifact
- artifact = Artifact()
- artifact.version = 0
- artifact.build_success = True
- artifact.strong_key = "abcdefghijklmnop"
- artifact.files.hash = "hashymchashash"
- artifact.files.size_bytes = 10
-
- artifact.files.CopyFrom(digest)
-
- # Put it in the artifact share with an UpdateArtifactRequest
- request = UpdateArtifactRequest()
- request.artifact.CopyFrom(artifact)
- request.cache_key = "a-cache-key"
-
- # should return the same artifact back
- if files == "present":
- response = artifact_stub.UpdateArtifact(request)
- assert response == artifact
- else:
- try:
- artifact_stub.UpdateArtifact(request)
- except grpc.RpcError as e:
- assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
- if files == "absent":
- assert e.details() == "Artifact files specified but no files found"
- elif files == "invalid":
- assert e.details() == "Artifact files specified but directory not found"
- return
-
- # If we uploaded the artifact check GetArtifact
- request = GetArtifactRequest()
- request.cache_key = "a-cache-key"
-
- response = artifact_stub.GetArtifact(request)
- assert response == artifact