diff options
author | Jürg Billeter <j@bitron.ch> | 2020-06-24 08:24:04 +0200 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-08-13 09:24:43 +0000 |
commit | 5c1e35651110667c00066076f8060096946321de (patch) | |
tree | cd9b2d707b899559c7597961a1abf09e5c528243 /src/buildstream/_assetcache.py | |
parent | 7d85545e5d8fc61de37657e4baab321d3986c3c6 (diff) | |
download | buildstream-5c1e35651110667c00066076f8060096946321de.tar.gz |
_assetcache.py: Add AssetRemote for Remote Asset API
Co-authored-by: Sander Striker <s.striker@striker.nl>
Diffstat (limited to 'src/buildstream/_assetcache.py')
-rw-r--r-- | src/buildstream/_assetcache.py | 150 |
1 files changed, 147 insertions, 3 deletions
diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py index b4f0f945e..1484ffb0d 100644 --- a/src/buildstream/_assetcache.py +++ b/src/buildstream/_assetcache.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 Bloomberg Finance LP +# Copyright (C) 2019-2020 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,19 +20,163 @@ import os from fnmatch import fnmatch from itertools import chain from typing import TYPE_CHECKING +import grpc from . import utils from . import _yaml from ._cas import CASRemote from ._message import Message, MessageType from ._exceptions import AssetCacheError, LoadError, RemoteError -from ._remote import RemoteSpec, RemoteType +from ._remote import BaseRemote, RemoteSpec, RemoteType +from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc +from ._protos.google.rpc import code_pb2 if TYPE_CHECKING: from typing import Optional, Type from ._exceptions import BstError - from ._remote import BaseRemote + + +class AssetRemote(BaseRemote): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fetch_service = None + self.push_service = None + + def close(self): + self.fetch_service = None + self.push_service = None + super().close() + + def _configure_protocols(self): + # set up remote asset stubs + self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel) + self.push_service = remote_asset_pb2_grpc.PushStub(self.channel) + + # _check(): + # + # Check if this remote provides everything required for the + # particular kind of remote. This is expected to be called as part + # of check() + # + # Raises: + # RemoteError: If the upstream has a problem + # + def _check(self): + request = remote_asset_pb2.FetchBlobRequest() + if self.instance_name: + request.instance_name = self.instance_name + + try: + self.fetch_service.FetchBlob(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # Expected error as the request doesn't specify any URIs. + pass + elif e.code() == grpc.StatusCode.UNIMPLEMENTED: + raise RemoteError( + "Configured remote does not implement the Remote Asset " + "Fetch service. Please check remote configuration." + ) + else: + raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details())) + + if self.spec.push: + request = remote_asset_pb2.PushBlobRequest() + if self.instance_name: + request.instance_name = self.instance_name + + try: + self.push_service.PushBlob(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.INVALID_ARGUMENT: + # Expected error as the request doesn't specify any URIs. + pass + elif e.code() == grpc.StatusCode.UNIMPLEMENTED: + raise RemoteError( + "Configured remote does not implement the Remote Asset " + "Push service. Please check remote configuration." + ) + else: + raise RemoteError( + "Remote initialisation failed with status {}: {}".format(e.code().name, e.details()) + ) + + # fetch_blob(): + # + # Resolve URIs to a CAS blob digest. + # + # Args: + # uris (list of str): The URIs to resolve. Multiple URIs should represent + # the same content available at different locations. + # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the + # content to fetch. + # + # Returns + # (FetchBlobResponse): The asset server response or None if the resource + # is not available. + # + # Raises: + # AssetCacheError: If the upstream has a problem + # + def fetch_blob(self, uris, *, qualifiers=None): + request = remote_asset_pb2.FetchBlobRequest() + if self.instance_name: + request.instance_name = self.instance_name + request.uris.extend(uris) + if qualifiers: + request.qualifiers.extend(qualifiers) + + try: + response = self.fetch_service.FetchBlob(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + return None + + raise AssetCacheError("FetchBlob failed with status {}: {}".format(e.code().name, e.details())) from e + + if response.status.code == code_pb2.NOT_FOUND: + return None + + if response.status.code != code_pb2.OK: + raise AssetCacheError("FetchBlob failed with response status {}".format(response.status.code)) + + return response + + # push_blob(): + # + # Associate a CAS blob digest to URIs. + # + # Args: + # uris (list of str): The URIs to associate with the blob digest. + # blob_digest (Digest): The CAS blob to associate. + # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the + # content that is being pushed. + # references_blobs (list of Digest): Referenced blobs that need to not expire + # before expiration of this association. + # references_directories (list of Digest): Referenced directories that need to not expire + # before expiration of this association. + # + # Raises: + # AssetCacheError: If the upstream has a problem + # + def push_blob(self, uris, blob_digest, *, qualifiers=None, references_blobs=None, references_directories=None): + request = remote_asset_pb2.PushBlobRequest() + if self.instance_name: + request.instance_name = self.instance_name + request.uris.extend(uris) + request.blob_digest.CopyFrom(blob_digest) + if qualifiers: + request.qualifiers.extend(qualifiers) + if references_blobs: + request.references_blobs.extend(references_blobs) + if references_directories: + request.references_directories.extend(references_directories) + + try: + self.push_service.PushBlob(request) + except grpc.RpcError as e: + raise AssetCacheError("PushBlob failed with status {}: {}".format(e.code().name, e.details())) from e # Base Asset Cache for Caches to derive from |