summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Striker <s.striker@striker.nl>2020-02-22 00:02:55 +0000
committerSander Striker <s.striker@striker.nl>2020-02-22 00:02:55 +0000
commit02ff4bdcad1d20813f110531dcbc99c0b1af44ed (patch)
treed1f900ce6ec4f91330ffdde3ffe5ae5ce1affabc
parent83330a3578ddda650b62e3af5c570b34510a54a7 (diff)
downloadbuildstream-sstriker-remote-asset-wip.tar.gz
WIP: Store artifact in asset cachesstriker-remote-asset-wip
-rw-r--r--src/buildstream/_artifact.py21
-rw-r--r--src/buildstream/_assetcache.py156
-rw-r--r--src/buildstream/_context.py10
-rw-r--r--src/buildstream/_exceptions.py9
4 files changed, 196 insertions, 0 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index ae1b395b3..5cb219c33 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -37,6 +37,8 @@ from . import utils
from .types import Scope
from .storage._casbaseddirectory import CasBasedDirectory
+REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = \
+ "urn:fdn:buildstream.build:20200223:artifact:{}"
# An Artifact class to abstract artifact operations
# from the Element class
@@ -58,6 +60,7 @@ class Artifact:
self._weak_cache_key = weak_key
self._artifactdir = context.artifactdir
self._cas = context.get_cascache()
+ self._assetcache = context.get_assetcache()
self._tmpdir = context.tmpdir
self._proto = None
@@ -193,6 +196,24 @@ class Artifact:
artifact.buildtree.CopyFrom(buildtreevdir._get_digest())
size += buildtreevdir.get_size()
+ # Store artifact in CAS
+ artifact_digest = self._cas.add_object(buffer=artifact.SerializeToString())
+
+ # Add artifact to AssetCache
+ keys = utils._deduplicate([artifact.strong_key, artifact.weak_key])
+ uris = [REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(key) for key in keys]
+ referenced_directories = []
+ if artifact.files:
+ referenced_directories.append(artifact.files)
+ if artifact.buildtree:
+ referenced_directories.append(artifact.buildtree)
+ referenced_blobs = artifact.logs
+ self._assetcache.push_blob(
+ artifact_digest, uris,
+ references_blobs=referenced_blobs,
+ references_directories=referenced_directories)
+
+ # TODO: remove non-CAS artifact logic
os.makedirs(os.path.dirname(os.path.join(self._artifactdir, element.get_artifact_name())), exist_ok=True)
keys = utils._deduplicate([self._cache_key, self._weak_cache_key])
for key in keys:
diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py
new file mode 100644
index 000000000..d29c13e22
--- /dev/null
+++ b/src/buildstream/_assetcache.py
@@ -0,0 +1,156 @@
+#
+# Copyright (C) 2020 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+import os
+import grpc
+
+from ._remote import BaseRemote
+from ._cas.casremote import BlobNotFound
+from .storage._casbaseddirectory import CasBasedDirectory
+from ._basecache import BaseCache
+from ._exceptions import CASError, CASRemoteError, SourceCacheError, RemoteError, AssetCacheError
+from . import utils
+from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, source_pb2, source_pb2_grpc
+from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
+
+
+class AssetRemote(BaseRemote):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.fetch_service = None
+ self.push_service = None
+
+ def close(self):
+ self.fetch_service = None
+ self.push_service = None
+ super().close()
+
+ def _configure_protocols(self):
+ # set up source service
+ self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel)
+ self.push_service = remote_asset_pb2_grpc.PushStub(self.channel)
+
+ # _check():
+ #
+ # Check if this remote provides everything required for the
+ # particular kind of remote. This is expected to be called as part
+ # of check()
+ #
+ # Raises:
+ # RemoteError: If the upstream has a problem
+ #
+ def _check(self):
+ pass
+ # capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+
+ # # check that the service supports sources
+ # try:
+ # request = buildstream_pb2.GetCapabilitiesRequest()
+ # if self.instance_name:
+ # request.instance_name = self.instance_name
+ # response = capabilities_service.GetCapabilities(request)
+ # except grpc.RpcError as e:
+ # # Check if this remote has the artifact service
+ # if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ # raise RemoteError(
+ # "Configured remote does not have the BuildStream "
+ # "capabilities service. Please check remote configuration."
+ # )
+ # raise RemoteError("Remote initialisation failed: {}".format(e.details()))
+
+ # if not response.source_capabilities:
+ # raise RemoteError("Configured remote does not support source service")
+
+ # if self.spec.push and not response.source_capabilities.allow_updates:
+ # raise RemoteError("Source server does not allow push")
+
+ # get_asset():
+ # ...
+ # returns Digest, uri, qualifiers
+ def fetch_blob(self, uris, *, qualifiers=None):
+
+ # TODO auto turn uris into a list if a string is passed
+
+ if not qualifers:
+ qualifiers = []
+
+ request = remote_asset_pb2,FetchBlob()
+ request.uris.extend(uris)
+ request.qualifiers.extend(qualifiers)
+
+ try:
+ response = self.fetch_service.FetchBlob(request)
+ except grpc.RpcError as e:
+ # if e.code() == grpc.StatusCode.NOT_FOUND:
+ # return False
+ # if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ # raise CASCacheError("Unsupported buildbox-casd version: FetchTree unimplemented") from e
+ raise
+
+
+ # TODO handle errors
+ # TODO handle response.status
+ if response.status == grpc.StatusCode.NOT_FOUND:
+ raise NotImplemented
+ # TODO handle other response codes
+ if response.status != grpc.StatusCode.OK:
+ raise AssetCacheError
+
+ return response # or return digest, uri, qualifiers?
+
+ def fetch_directory(self, uris, *, qualifiers=None):
+ raise NotImplemented
+
+ def push_blob(self, blob_digest, uris, *, qualifiers=None,
+ references_blobs=None, references_directories=None):
+ raise NotImplementedError
+
+ def push_directory(self, directory_digest, uris, *, qualifiers=None,
+ references_blobs=None, references_directories=None):
+ raise NotImplementedError
+
+
+# Class that keeps config of remotes and deals with caching of assets.
+#
+# Args:
+# context (Context): The Buildstream context
+#
+class AssetCache(BaseCache):
+
+ spec_name = "asset_cache_specs"
+ spec_error = AssetCacheError
+ config_node_name = "asset-caches"
+ index_remote_class = AssetRemote
+
+ def __init__(self, context):
+ super().__init__(context)
+
+ self._basedir = os.path.join(context.cachedir, "asset_protos")
+ os.makedirs(self._basedir, exist_ok=True)
+
+ def fetch_blob(self, uris, *, qualifiers=None):
+ raise NotImplementedError
+
+ def fetch_directory(self, uris, *, qualifiers=None):
+ raise NotImplementedError
+
+ def push_blob(self, blob_digest, uris, *, qualifiers=None,
+ references_blobs=None, references_directories=None):
+ raise NotImplementedError
+
+ def push_directory(self, directory_digest, uris, *, qualifiers=None,
+ references_blobs=None, references_directories=None):
+ raise NotImplementedError
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index c3ea52f0e..61bd038b8 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -410,6 +410,10 @@ class Context:
return self._artifactcache
@property
+ def assetcache(self):
+ return self.get_assetcache()
+
+ @property
def sourcecache(self):
if not self._sourcecache:
self._sourcecache = SourceCache(self)
@@ -538,6 +542,12 @@ class Context:
)
return self._cascache
+ def get_assetcache(self):
+ if self._assetcache is None:
+ self._assetcache = AssetCache(self)
+
+ return self._cascache
+
# prepare_fork():
#
# Prepare this process for fork without exec. This is a safeguard against
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index e9599d225..f0b1007c9 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -180,6 +180,15 @@ class CacheError(BstError):
super().__init__(message, detail=detail, domain=ErrorDomain.SANDBOX, reason=reason)
+# AssetCacheError
+#
+# Raised when errors are encountered in the asset caches
+#
+class AssetCacheError(BstError):
+ def __init__(self, message, detail=None, reason=None):
+ super().__init__(message, detail=detail, domain=ErrorDomain.SANDBOX, reason=reason)
+
+
# SourceCacheError
#
# Raised when errors are encountered in the source caches