From a0a22a83f48c097bfdb2061ed4f8b00e24632c68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 24 Jun 2020 09:09:29 +0200 Subject: casserver.py: Add proxy for Remote Asset API --- src/buildstream/_cas/casserver.py | 50 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 71d7d9071..d52e07291 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -1,5 +1,6 @@ # # Copyright (C) 2018 Codethink Limited +# Copyright (C) 2020 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -29,6 +30,7 @@ import grpc from google.protobuf.message import DecodeError import click +from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import ( remote_execution_pb2, remote_execution_pb2_grpc, @@ -132,6 +134,12 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(_CapabilitiesServicer(), server) + # Remote Asset API + remote_asset_pb2_grpc.add_FetchServicer_to_server(_FetchServicer(casd_channel), server) + if enable_push: + remote_asset_pb2_grpc.add_PushServicer_to_server(_PushServicer(casd_channel), server) + + # BuildStream protocols buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( _ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server ) @@ -295,6 +303,48 @@ class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): return response +class _FetchServicer(remote_asset_pb2_grpc.FetchServicer): + def __init__(self, casd): + super().__init__() + self.fetch = casd.get_asset_fetch() + self.logger = logging.getLogger("buildstream._cas.casserver") + + def FetchBlob(self, request, context): + self.logger.debug("FetchBlob '%s'", request.uris) + try: + return self.fetch.FetchBlob(request) + except grpc.RpcError as err: + context.abort(err.code(), err.details()) + + def FetchDirectory(self, request, context): + self.logger.debug("FetchDirectory '%s'", request.uris) + try: + return self.fetch.FetchDirectory(request) + except grpc.RpcError as err: + context.abort(err.code(), err.details()) + + +class _PushServicer(remote_asset_pb2_grpc.PushServicer): + def __init__(self, casd): + super().__init__() + self.push = casd.get_asset_push() + self.logger = logging.getLogger("buildstream._cas.casserver") + + def PushBlob(self, request, context): + self.logger.debug("PushBlob '%s'", request.uris) + try: + return self.push.PushBlob(request) + except grpc.RpcError as err: + context.abort(err.code(), err.details()) + + def PushDirectory(self, request, context): + self.logger.debug("PushDirectory '%s'", request.uris) + try: + return self.push.PushDirectory(request) + except grpc.RpcError as err: + context.abort(err.code(), err.details()) + + class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): def __init__(self, casd, cas_root, *, enable_push): super().__init__() -- cgit v1.2.1