diff options
-rw-r--r-- | buildstream/_cas/casremote.py | 31 |
1 files changed, 28 insertions, 3 deletions
diff --git a/buildstream/_cas/casremote.py b/buildstream/_cas/casremote.py index 31b87ee77..df1dd799c 100644 --- a/buildstream/_cas/casremote.py +++ b/buildstream/_cas/casremote.py @@ -83,6 +83,7 @@ class CASRemote(): self.spec = spec self._initialized = False self.channel = None + self.instance_name = None self.bytestream = None self.cas = None self.ref_storage = None @@ -125,6 +126,8 @@ class CASRemote(): else: raise CASRemoteError("Unsupported URL: {}".format(self.spec.url)) + self.instance_name = self.spec.instance_name or None + self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) self.capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self.channel) @@ -133,6 +136,8 @@ class CASRemote(): self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES try: request = remote_execution_pb2.GetCapabilitiesRequest() + if self.instance_name: + request.instance_name = self.instance_name response = self.capabilities.GetCapabilities(request) server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: @@ -146,6 +151,8 @@ class CASRemote(): self.batch_read_supported = False try: request = remote_execution_pb2.BatchReadBlobsRequest() + if self.instance_name: + request.instance_name = self.instance_name response = self.cas.BatchReadBlobs(request) self.batch_read_supported = True except grpc.RpcError as e: @@ -156,6 +163,8 @@ class CASRemote(): self.batch_update_supported = False try: request = remote_execution_pb2.BatchUpdateBlobsRequest() + if self.instance_name: + request.instance_name = self.instance_name response = self.cas.BatchUpdateBlobs(request) self.batch_update_supported = True except grpc.RpcError as e: @@ -224,6 +233,8 @@ class CASRemote(): self.init() request = remote_execution_pb2.FindMissingBlobsRequest() + if self.instance_name: + request.instance_name = self.instance_name request.blob_digests.extend([digest]) response = self.cas.FindMissingBlobs(request) @@ -258,7 +269,13 @@ class CASRemote(): # Local Private Methods # ################################################ def _fetch_blob(self, digest, stream): - resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) + if self.instance_name: + resource_name = '/'.join([self.instance_name, 'blobs', + digest.hash, str(digest.size_bytes)]) + else: + resource_name = '/'.join(['blobs', + digest.hash, str(digest.size_bytes)]) + request = bytestream_pb2.ReadRequest() request.resource_name = resource_name request.read_offset = 0 @@ -269,8 +286,12 @@ class CASRemote(): assert digest.size_bytes == os.fstat(stream.fileno()).st_size def _send_blob(self, digest, stream, u_uid=uuid.uuid4()): - resource_name = '/'.join(['uploads', str(u_uid), 'blobs', - digest.hash, str(digest.size_bytes)]) + if self.instance_name: + resource_name = '/'.join([self.instance_name, 'uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)]) + else: + resource_name = '/'.join(['uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)]) def request_stream(resname, instream): offset = 0 @@ -304,6 +325,8 @@ class _CASBatchRead(): self._remote = remote self._max_total_size_bytes = remote.max_batch_total_size_bytes self._request = remote_execution_pb2.BatchReadBlobsRequest() + if remote.instance_name: + self._request.instance_name = remote.instance_name self._size = 0 self._sent = False @@ -351,6 +374,8 @@ class _CASBatchUpdate(): self._remote = remote self._max_total_size_bytes = remote.max_batch_total_size_bytes self._request = remote_execution_pb2.BatchUpdateBlobsRequest() + if remote.instance_name: + self._request.instance_name = remote.instance_name self._size = 0 self._sent = False |