diff options
author | Sander Striker <s.striker@striker.nl> | 2020-02-22 00:02:55 +0000 |
---|---|---|
committer | Sander Striker <s.striker@striker.nl> | 2020-02-22 00:02:55 +0000 |
commit | 02ff4bdcad1d20813f110531dcbc99c0b1af44ed (patch) | |
tree | d1f900ce6ec4f91330ffdde3ffe5ae5ce1affabc | |
parent | 83330a3578ddda650b62e3af5c570b34510a54a7 (diff) | |
download | buildstream-sstriker-remote-asset-wip.tar.gz |
WIP: Store artifact in asset cachesstriker-remote-asset-wip
-rw-r--r-- | src/buildstream/_artifact.py | 21 | ||||
-rw-r--r-- | src/buildstream/_assetcache.py | 156 | ||||
-rw-r--r-- | src/buildstream/_context.py | 10 | ||||
-rw-r--r-- | src/buildstream/_exceptions.py | 9 |
4 files changed, 196 insertions, 0 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py index ae1b395b3..5cb219c33 100644 --- a/src/buildstream/_artifact.py +++ b/src/buildstream/_artifact.py @@ -37,6 +37,8 @@ from . import utils from .types import Scope from .storage._casbaseddirectory import CasBasedDirectory +REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = \ + "urn:fdn:buildstream.build:20200223:artifact:{}" # An Artifact class to abstract artifact operations # from the Element class @@ -58,6 +60,7 @@ class Artifact: self._weak_cache_key = weak_key self._artifactdir = context.artifactdir self._cas = context.get_cascache() + self._assetcache = context.get_assetcache() self._tmpdir = context.tmpdir self._proto = None @@ -193,6 +196,24 @@ class Artifact: artifact.buildtree.CopyFrom(buildtreevdir._get_digest()) size += buildtreevdir.get_size() + # Store artifact in CAS + artifact_digest = self._cas.add_object(buffer=artifact.SerializeToString()) + + # Add artifact to AssetCache + keys = utils._deduplicate([artifact.strong_key, artifact.weak_key]) + uris = [REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(key) for key in keys] + referenced_directories = [] + if artifact.files: + referenced_directories.append(artifact.files) + if artifact.buildtree: + referenced_directories.append(artifact.buildtree) + referenced_blobs = artifact.logs + self._assetcache.push_blob( + artifact_digest, uris, + references_blobs=referenced_blobs, + references_directories=referenced_directories) + + # TODO: remove non-CAS artifact logic os.makedirs(os.path.dirname(os.path.join(self._artifactdir, element.get_artifact_name())), exist_ok=True) keys = utils._deduplicate([self._cache_key, self._weak_cache_key]) for key in keys: diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py new file mode 100644 index 000000000..d29c13e22 --- /dev/null +++ b/src/buildstream/_assetcache.py @@ -0,0 +1,156 @@ +# +# Copyright (C) 2020 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +import os +import grpc + +from ._remote import BaseRemote +from ._cas.casremote import BlobNotFound +from .storage._casbaseddirectory import CasBasedDirectory +from ._basecache import BaseCache +from ._exceptions import CASError, CASRemoteError, SourceCacheError, RemoteError, AssetCacheError +from . import utils +from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, source_pb2, source_pb2_grpc +from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc + + +class AssetRemote(BaseRemote): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fetch_service = None + self.push_service = None + + def close(self): + self.fetch_service = None + self.push_service = None + super().close() + + def _configure_protocols(self): + # set up source service + self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel) + self.push_service = remote_asset_pb2_grpc.PushStub(self.channel) + + # _check(): + # + # Check if this remote provides everything required for the + # particular kind of remote. This is expected to be called as part + # of check() + # + # Raises: + # RemoteError: If the upstream has a problem + # + def _check(self): + pass + # capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + + # # check that the service supports sources + # try: + # request = buildstream_pb2.GetCapabilitiesRequest() + # if self.instance_name: + # request.instance_name = self.instance_name + # response = capabilities_service.GetCapabilities(request) + # except grpc.RpcError as e: + # # Check if this remote has the artifact service + # if e.code() == grpc.StatusCode.UNIMPLEMENTED: + # raise RemoteError( + # "Configured remote does not have the BuildStream " + # "capabilities service. Please check remote configuration." + # ) + # raise RemoteError("Remote initialisation failed: {}".format(e.details())) + + # if not response.source_capabilities: + # raise RemoteError("Configured remote does not support source service") + + # if self.spec.push and not response.source_capabilities.allow_updates: + # raise RemoteError("Source server does not allow push") + + # get_asset(): + # ... + # returns Digest, uri, qualifiers + def fetch_blob(self, uris, *, qualifiers=None): + + # TODO auto turn uris into a list if a string is passed + + if not qualifers: + qualifiers = [] + + request = remote_asset_pb2,FetchBlob() + request.uris.extend(uris) + request.qualifiers.extend(qualifiers) + + try: + response = self.fetch_service.FetchBlob(request) + except grpc.RpcError as e: + # if e.code() == grpc.StatusCode.NOT_FOUND: + # return False + # if e.code() == grpc.StatusCode.UNIMPLEMENTED: + # raise CASCacheError("Unsupported buildbox-casd version: FetchTree unimplemented") from e + raise + + + # TODO handle errors + # TODO handle response.status + if response.status == grpc.StatusCode.NOT_FOUND: + raise NotImplemented + # TODO handle other response codes + if response.status != grpc.StatusCode.OK: + raise AssetCacheError + + return response # or return digest, uri, qualifiers? + + def fetch_directory(self, uris, *, qualifiers=None): + raise NotImplemented + + def push_blob(self, blob_digest, uris, *, qualifiers=None, + references_blobs=None, references_directories=None): + raise NotImplementedError + + def push_directory(self, directory_digest, uris, *, qualifiers=None, + references_blobs=None, references_directories=None): + raise NotImplementedError + + +# Class that keeps config of remotes and deals with caching of assets. +# +# Args: +# context (Context): The Buildstream context +# +class AssetCache(BaseCache): + + spec_name = "asset_cache_specs" + spec_error = AssetCacheError + config_node_name = "asset-caches" + index_remote_class = AssetRemote + + def __init__(self, context): + super().__init__(context) + + self._basedir = os.path.join(context.cachedir, "asset_protos") + os.makedirs(self._basedir, exist_ok=True) + + def fetch_blob(self, uris, *, qualifiers=None): + raise NotImplementedError + + def fetch_directory(self, uris, *, qualifiers=None): + raise NotImplementedError + + def push_blob(self, blob_digest, uris, *, qualifiers=None, + references_blobs=None, references_directories=None): + raise NotImplementedError + + def push_directory(self, directory_digest, uris, *, qualifiers=None, + references_blobs=None, references_directories=None): + raise NotImplementedError diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index c3ea52f0e..61bd038b8 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -410,6 +410,10 @@ class Context: return self._artifactcache @property + def assetcache(self): + return self.get_assetcache() + + @property def sourcecache(self): if not self._sourcecache: self._sourcecache = SourceCache(self) @@ -538,6 +542,12 @@ class Context: ) return self._cascache + def get_assetcache(self): + if self._assetcache is None: + self._assetcache = AssetCache(self) + + return self._cascache + # prepare_fork(): # # Prepare this process for fork without exec. This is a safeguard against diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py index e9599d225..f0b1007c9 100644 --- a/src/buildstream/_exceptions.py +++ b/src/buildstream/_exceptions.py @@ -180,6 +180,15 @@ class CacheError(BstError): super().__init__(message, detail=detail, domain=ErrorDomain.SANDBOX, reason=reason) +# AssetCacheError +# +# Raised when errors are encountered in the asset caches +# +class AssetCacheError(BstError): + def __init__(self, message, detail=None, reason=None): + super().__init__(message, detail=detail, domain=ErrorDomain.SANDBOX, reason=reason) + + # SourceCacheError # # Raised when errors are encountered in the source caches |