summaryrefslogtreecommitdiff
path: root/src/buildstream/_assetcache.py
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-06-24 08:24:04 +0200
committerbst-marge-bot <marge-bot@buildstream.build>2020-08-13 09:24:43 +0000
commit5c1e35651110667c00066076f8060096946321de (patch)
treecd9b2d707b899559c7597961a1abf09e5c528243 /src/buildstream/_assetcache.py
parent7d85545e5d8fc61de37657e4baab321d3986c3c6 (diff)
downloadbuildstream-5c1e35651110667c00066076f8060096946321de.tar.gz
_assetcache.py: Add AssetRemote for Remote Asset API
Co-authored-by: Sander Striker <s.striker@striker.nl>
Diffstat (limited to 'src/buildstream/_assetcache.py')
-rw-r--r--src/buildstream/_assetcache.py150
1 files changed, 147 insertions, 3 deletions
diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py
index b4f0f945e..1484ffb0d 100644
--- a/src/buildstream/_assetcache.py
+++ b/src/buildstream/_assetcache.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2019 Bloomberg Finance LP
+# Copyright (C) 2019-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -20,19 +20,163 @@ import os
from fnmatch import fnmatch
from itertools import chain
from typing import TYPE_CHECKING
+import grpc
from . import utils
from . import _yaml
from ._cas import CASRemote
from ._message import Message, MessageType
from ._exceptions import AssetCacheError, LoadError, RemoteError
-from ._remote import RemoteSpec, RemoteType
+from ._remote import BaseRemote, RemoteSpec, RemoteType
+from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
+from ._protos.google.rpc import code_pb2
if TYPE_CHECKING:
from typing import Optional, Type
from ._exceptions import BstError
- from ._remote import BaseRemote
+
+
+class AssetRemote(BaseRemote):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.fetch_service = None
+ self.push_service = None
+
+ def close(self):
+ self.fetch_service = None
+ self.push_service = None
+ super().close()
+
+ def _configure_protocols(self):
+ # set up remote asset stubs
+ self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel)
+ self.push_service = remote_asset_pb2_grpc.PushStub(self.channel)
+
+ # _check():
+ #
+ # Check if this remote provides everything required for the
+ # particular kind of remote. This is expected to be called as part
+ # of check()
+ #
+ # Raises:
+ # RemoteError: If the upstream has a problem
+ #
+ def _check(self):
+ request = remote_asset_pb2.FetchBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ try:
+ self.fetch_service.FetchBlob(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
+ # Expected error as the request doesn't specify any URIs.
+ pass
+ elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise RemoteError(
+ "Configured remote does not implement the Remote Asset "
+ "Fetch service. Please check remote configuration."
+ )
+ else:
+ raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details()))
+
+ if self.spec.push:
+ request = remote_asset_pb2.PushBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ try:
+ self.push_service.PushBlob(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
+ # Expected error as the request doesn't specify any URIs.
+ pass
+ elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise RemoteError(
+ "Configured remote does not implement the Remote Asset "
+ "Push service. Please check remote configuration."
+ )
+ else:
+ raise RemoteError(
+ "Remote initialisation failed with status {}: {}".format(e.code().name, e.details())
+ )
+
+ # fetch_blob():
+ #
+ # Resolve URIs to a CAS blob digest.
+ #
+ # Args:
+ # uris (list of str): The URIs to resolve. Multiple URIs should represent
+ # the same content available at different locations.
+ # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
+ # content to fetch.
+ #
+ # Returns
+ # (FetchBlobResponse): The asset server response or None if the resource
+ # is not available.
+ #
+ # Raises:
+ # AssetCacheError: If the upstream has a problem
+ #
+ def fetch_blob(self, uris, *, qualifiers=None):
+ request = remote_asset_pb2.FetchBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ request.uris.extend(uris)
+ if qualifiers:
+ request.qualifiers.extend(qualifiers)
+
+ try:
+ response = self.fetch_service.FetchBlob(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ return None
+
+ raise AssetCacheError("FetchBlob failed with status {}: {}".format(e.code().name, e.details())) from e
+
+ if response.status.code == code_pb2.NOT_FOUND:
+ return None
+
+ if response.status.code != code_pb2.OK:
+ raise AssetCacheError("FetchBlob failed with response status {}".format(response.status.code))
+
+ return response
+
+ # push_blob():
+ #
+ # Associate a CAS blob digest to URIs.
+ #
+ # Args:
+ # uris (list of str): The URIs to associate with the blob digest.
+ # blob_digest (Digest): The CAS blob to associate.
+ # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
+ # content that is being pushed.
+ # references_blobs (list of Digest): Referenced blobs that need to not expire
+ # before expiration of this association.
+ # references_directories (list of Digest): Referenced directories that need to not expire
+ # before expiration of this association.
+ #
+ # Raises:
+ # AssetCacheError: If the upstream has a problem
+ #
+ def push_blob(self, uris, blob_digest, *, qualifiers=None, references_blobs=None, references_directories=None):
+ request = remote_asset_pb2.PushBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ request.uris.extend(uris)
+ request.blob_digest.CopyFrom(blob_digest)
+ if qualifiers:
+ request.qualifiers.extend(qualifiers)
+ if references_blobs:
+ request.references_blobs.extend(references_blobs)
+ if references_directories:
+ request.references_directories.extend(references_directories)
+
+ try:
+ self.push_service.PushBlob(request)
+ except grpc.RpcError as e:
+ raise AssetCacheError("PushBlob failed with status {}: {}".format(e.code().name, e.details())) from e
# Base Asset Cache for Caches to derive from