diff options
Diffstat (limited to 'buildstream/_artifactcache')
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 49 |
1 files changed, 31 insertions, 18 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 9ca757d4d..9ba748d3a 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -45,7 +45,7 @@ from .. import _yaml _MAX_PAYLOAD_BYTES = 1024 * 1024 -class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')): +class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')): # _new_from_config_node # @@ -53,7 +53,7 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key # @staticmethod def _new_from_config_node(spec_node, basedir=None): - _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert']) + _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name']) url = _yaml.node_get(spec_node, str, 'url') push = _yaml.node_get(spec_node, bool, 'push', default_value=False) if not url: @@ -61,6 +61,8 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key raise LoadError(LoadErrorReason.INVALID_DATA, "{}: empty artifact cache URL".format(provenance)) + instance_name = _yaml.node_get(spec_node, str, 'instance_name', default_value=None) + server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None) if server_cert and basedir: server_cert = os.path.join(basedir, server_cert) @@ -83,10 +85,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key raise LoadError(LoadErrorReason.INVALID_DATA, "{}: 'client-cert' was specified without 'client-key'".format(provenance)) - return CASRemoteSpec(url, push, server_cert, client_key, client_cert) + return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name) -CASRemoteSpec.__new__.__defaults__ = (None, None, None) +CASRemoteSpec.__new__.__defaults__ = (None, None, None, None) class BlobNotFound(CASError): @@ -248,7 +250,7 @@ class CASCache(): remote = CASRemote(remote_spec) remote.init() - request = buildstream_pb2.StatusRequest() + request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name) response = remote.ref_storage.Status(request) if remote_spec.push and not response.allow_updates: @@ -284,7 +286,7 @@ class CASCache(): try: remote.init() - request = buildstream_pb2.GetReferenceRequest() + request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) request.key = ref response = remote.ref_storage.GetReference(request) @@ -369,7 +371,7 @@ class CASCache(): # Check whether ref is already on the server in which case # there is no need to push the ref try: - request = buildstream_pb2.GetReferenceRequest() + request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name) request.key = ref response = remote.ref_storage.GetReference(request) @@ -384,7 +386,7 @@ class CASCache(): self._send_directory(remote, tree) - request = buildstream_pb2.UpdateReferenceRequest() + request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name) request.keys.append(ref) request.digest.hash = tree.hash request.digest.size_bytes = tree.size_bytes @@ -448,7 +450,7 @@ class CASCache(): def verify_digest_on_remote(self, remote, digest): remote.init() - request = remote_execution_pb2.FindMissingBlobsRequest() + request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) request.blob_digests.extend([digest]) response = remote.cas.FindMissingBlobs(request) @@ -908,7 +910,13 @@ class CASCache(): yield from self._required_blobs(dirnode.digest) def _fetch_blob(self, remote, digest, stream): - resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) + resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)] + + if remote.spec.instance_name: + resource_name_components.insert(0, remote.spec.instance_name) + + resource_name = '/'.join(resource_name_components) + request = bytestream_pb2.ReadRequest() request.resource_name = resource_name request.read_offset = 0 @@ -1064,8 +1072,13 @@ class CASCache(): return dirdigest def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()): - resource_name = '/'.join(['uploads', str(u_uid), 'blobs', - digest.hash, str(digest.size_bytes)]) + resource_name_components = ['uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)] + + if remote.spec.instance_name: + resource_name_components.insert(0, remote.spec.instance_name) + + resource_name = '/'.join(resource_name_components) def request_stream(resname, instream): offset = 0 @@ -1097,7 +1110,7 @@ class CASCache(): missing_blobs = dict() # Limit size of FindMissingBlobs request for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest() + request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name) for required_digest in required_blobs_group: d = request.blob_digests.add() @@ -1193,7 +1206,7 @@ class CASRemote(): self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES try: - request = remote_execution_pb2.GetCapabilitiesRequest() + request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name) response = self.capabilities.GetCapabilities(request) server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: @@ -1206,7 +1219,7 @@ class CASRemote(): # Check whether the server supports BatchReadBlobs() self.batch_read_supported = False try: - request = remote_execution_pb2.BatchReadBlobsRequest() + request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name) response = self.cas.BatchReadBlobs(request) self.batch_read_supported = True except grpc.RpcError as e: @@ -1216,7 +1229,7 @@ class CASRemote(): # Check whether the server supports BatchUpdateBlobs() self.batch_update_supported = False try: - request = remote_execution_pb2.BatchUpdateBlobsRequest() + request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name) response = self.cas.BatchUpdateBlobs(request) self.batch_update_supported = True except grpc.RpcError as e: @@ -1233,7 +1246,7 @@ class _CASBatchRead(): def __init__(self, remote): self._remote = remote self._max_total_size_bytes = remote.max_batch_total_size_bytes - self._request = remote_execution_pb2.BatchReadBlobsRequest() + self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name) self._size = 0 self._sent = False @@ -1280,7 +1293,7 @@ class _CASBatchUpdate(): def __init__(self, remote): self._remote = remote self._max_total_size_bytes = remote.max_batch_total_size_bytes - self._request = remote_execution_pb2.BatchUpdateBlobsRequest() + self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name) self._size = 0 self._sent = False |