summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAbderrahim Kitouni <akitouni@gnome.org>2020-07-16 20:56:15 +0100
committerAbderrahim Kitouni <akitouni@gnome.org>2020-07-30 18:49:25 +0100
commit754007ed90d683b0dcb6cd042244ad41db79cf94 (patch)
tree4748c28354d5ab5500187f02218b4bd9b4cde41f
parentbd2aa20f3d1886174b1a9cd45ea4318e5cada73c (diff)
downloadbuildstream-abderrahim/remote-asset-bst1.tar.gz
cascache.py: allow using Remote Asset for storing refsabderrahim/remote-asset-bst1
-rw-r--r--buildstream/_artifactcache/cascache.py124
1 files changed, 95 insertions, 29 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 645f2dad7..27020a099 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -31,6 +31,7 @@ import grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
from .. import utils
@@ -44,6 +45,8 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024
# How often is a keepalive ping sent to the server to make sure the transport is still alive
_KEEPALIVE_TIME_MS = 60000
+REMOTE_ASSET_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:v1:{}"
+
class _Attempt():
@@ -215,16 +218,23 @@ class CASCache():
remote = CASRemote(remote_spec)
remote.init()
- request = buildstream_pb2.StatusRequest()
- for attempt in _retry():
- with attempt:
- response = remote.ref_storage.Status(request)
-
- if remote_spec.push and not response.allow_updates:
- q.put('CAS server does not allow push')
+ if remote.asset_fetch_supported:
+ if remote_spec.push and not remote.asset_push_supported:
+ q.put('Remote Asset server does not allow push')
+ else:
+ # No error
+ q.put(None)
else:
- # No error
- q.put(None)
+ request = buildstream_pb2.StatusRequest()
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.Status(request)
+
+ if remote_spec.push and not response.allow_updates:
+ q.put('CAS server does not allow push')
+ else:
+ # No error
+ q.put(None)
except grpc.RpcError as e:
# str(e) is too verbose for errors reported to the user
@@ -251,15 +261,24 @@ class CASCache():
try:
remote.init()
- request = buildstream_pb2.GetReferenceRequest()
- request.key = ref
- for attempt in _retry():
- with attempt:
- response = remote.ref_storage.GetReference(request)
+ if remote.asset_fetch_supported:
+ request = remote_asset_pb2.FetchDirectoryRequest()
+ request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref))
+ for attempt in _retry():
+ with attempt:
+ response = remote.remote_asset_fetch.FetchDirectory(request)
+ digest = response.root_directory_digest
+ else:
+ request = buildstream_pb2.GetReferenceRequest()
+ request.key = ref
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
+ digest = response.digest
tree = remote_execution_pb2.Digest()
- tree.hash = response.digest.hash
- tree.size_bytes = response.digest.size_bytes
+ tree.hash = digest.hash
+ tree.size_bytes = digest.size_bytes
self._fetch_directory(remote, tree)
@@ -308,13 +327,23 @@ class CASCache():
# Check whether ref is already on the server in which case
# there is no need to push the ref
try:
- request = buildstream_pb2.GetReferenceRequest()
- request.key = ref
- for attempt in _retry():
- with attempt:
- response = remote.ref_storage.GetReference(request)
+ if remote.asset_fetch_supported:
+ request = remote_asset_pb2.FetchDirectoryRequest()
+ request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref))
+ for attempt in _retry():
+ with attempt:
+ response = remote.remote_asset_fetch.FetchDirectory(request)
+ digest = response.root_directory_digest
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
+ else:
+ request = buildstream_pb2.GetReferenceRequest()
+ request.key = ref
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
+ digest = response.digest
+
+ if digest.hash == tree.hash and digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
continue
@@ -325,13 +354,22 @@ class CASCache():
self._send_directory(remote, tree)
- request = buildstream_pb2.UpdateReferenceRequest()
- request.keys.append(ref)
- request.digest.hash = tree.hash
- request.digest.size_bytes = tree.size_bytes
- for attempt in _retry():
- with attempt:
- remote.ref_storage.UpdateReference(request)
+ if remote.asset_push_supported:
+ request = remote_asset_pb2.PushDirectoryRequest()
+ request.uris.append(REMOTE_ASSET_URN_TEMPLATE.format(ref))
+ request.root_directory_digest.hash = tree.hash
+ request.root_directory_digest.size_bytes = tree.size_bytes
+ for attempt in _retry():
+ with attempt:
+ remote.remote_asset_push.PushDirectory(request)
+ else:
+ request = buildstream_pb2.UpdateReferenceRequest()
+ request.keys.append(ref)
+ request.digest.hash = tree.hash
+ request.digest.size_bytes = tree.size_bytes
+ for attempt in _retry():
+ with attempt:
+ remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
@@ -1050,6 +1088,8 @@ class CASRemote():
self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel)
self.ref_storage = buildstream_pb2_grpc.ReferenceStorageStub(self.channel)
+ self.remote_asset_fetch = remote_asset_pb2_grpc.FetchStub(self.channel)
+ self.remote_asset_push = remote_asset_pb2_grpc.PushStub(self.channel)
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
try:
@@ -1090,6 +1130,32 @@ class CASRemote():
e.code() != grpc.StatusCode.PERMISSION_DENIED):
raise
+ self.asset_fetch_supported = False
+ try:
+ request = remote_asset_pb2.FetchDirectoryRequest()
+ for attempt in _retry():
+ with attempt:
+ response = self.remote_asset_fetch.FetchDirectory(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
+ # Expected error as the request doesn't specify any URIs.
+ self.asset_fetch_supported = True
+ elif e.code() != grpc.StatusCode.UNIMPLEMENTED:
+ raise
+
+ self.asset_push_supported = False
+ try:
+ request = remote_asset_pb2.PushDirectoryRequest()
+ for attempt in _retry():
+ with attempt:
+ response = self.remote_asset_push.PushDirectory(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
+ # Expected error as the request doesn't specify any URIs.
+ self.asset_push_supported = True
+ elif e.code() != grpc.StatusCode.UNIMPLEMENTED:
+ raise
+
self._initialized = True