summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim.macarthur@codethink.co.uk>2018-12-10 13:18:27 +0000
committerJim MacArthur <jim.macarthur@codethink.co.uk>2018-12-18 11:13:56 +0000
commit89219f61ca11f6b5346832c27dcd8299907c965e (patch)
tree79213d034e05d439d4859e2e39502c518a2c6e58
parenta3bbec23b328f7e2b873021a4bc1c3636a209162 (diff)
downloadbuildstream-89219f61ca11f6b5346832c27dcd8299907c965e.tar.gz
_cascache.py: Add instance names to GRPC calls
Adds the 'instance_name' parameter, which may be None, to most GRPC calls in the CASCache object. ByteStream requests already have instance_name supplied in the resource name, so do not need the parameter. Closes #627.
-rw-r--r--buildstream/_artifactcache/cascache.py49
1 files changed, 31 insertions, 18 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 9ca757d4d..9ba748d3a 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -45,7 +45,7 @@ from .. import _yaml
_MAX_PAYLOAD_BYTES = 1024 * 1024
-class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
+class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
# _new_from_config_node
#
@@ -53,7 +53,7 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
#
@staticmethod
def _new_from_config_node(spec_node, basedir=None):
- _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
+ _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
url = _yaml.node_get(spec_node, str, 'url')
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
if not url:
@@ -61,6 +61,8 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: empty artifact cache URL".format(provenance))
+ instance_name = _yaml.node_get(spec_node, str, 'instance_name', default_value=None)
+
server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
if server_cert and basedir:
server_cert = os.path.join(basedir, server_cert)
@@ -83,10 +85,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
- return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
+ return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
-CASRemoteSpec.__new__.__defaults__ = (None, None, None)
+CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
class BlobNotFound(CASError):
@@ -248,7 +250,7 @@ class CASCache():
remote = CASRemote(remote_spec)
remote.init()
- request = buildstream_pb2.StatusRequest()
+ request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name)
response = remote.ref_storage.Status(request)
if remote_spec.push and not response.allow_updates:
@@ -284,7 +286,7 @@ class CASCache():
try:
remote.init()
- request = buildstream_pb2.GetReferenceRequest()
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
@@ -369,7 +371,7 @@ class CASCache():
# Check whether ref is already on the server in which case
# there is no need to push the ref
try:
- request = buildstream_pb2.GetReferenceRequest()
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
@@ -384,7 +386,7 @@ class CASCache():
self._send_directory(remote, tree)
- request = buildstream_pb2.UpdateReferenceRequest()
+ request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
@@ -448,7 +450,7 @@ class CASCache():
def verify_digest_on_remote(self, remote, digest):
remote.init()
- request = remote_execution_pb2.FindMissingBlobsRequest()
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
request.blob_digests.extend([digest])
response = remote.cas.FindMissingBlobs(request)
@@ -908,7 +910,13 @@ class CASCache():
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, stream):
- resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
+ resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
+
+ if remote.spec.instance_name:
+ resource_name_components.insert(0, remote.spec.instance_name)
+
+ resource_name = '/'.join(resource_name_components)
+
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
@@ -1064,8 +1072,13 @@ class CASCache():
return dirdigest
def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
- resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
- digest.hash, str(digest.size_bytes)])
+ resource_name_components = ['uploads', str(u_uid), 'blobs',
+ digest.hash, str(digest.size_bytes)]
+
+ if remote.spec.instance_name:
+ resource_name_components.insert(0, remote.spec.instance_name)
+
+ resource_name = '/'.join(resource_name_components)
def request_stream(resname, instream):
offset = 0
@@ -1097,7 +1110,7 @@ class CASCache():
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
- request = remote_execution_pb2.FindMissingBlobsRequest()
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
@@ -1193,7 +1206,7 @@ class CASRemote():
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
try:
- request = remote_execution_pb2.GetCapabilitiesRequest()
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name)
response = self.capabilities.GetCapabilities(request)
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
@@ -1206,7 +1219,7 @@ class CASRemote():
# Check whether the server supports BatchReadBlobs()
self.batch_read_supported = False
try:
- request = remote_execution_pb2.BatchReadBlobsRequest()
+ request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name)
response = self.cas.BatchReadBlobs(request)
self.batch_read_supported = True
except grpc.RpcError as e:
@@ -1216,7 +1229,7 @@ class CASRemote():
# Check whether the server supports BatchUpdateBlobs()
self.batch_update_supported = False
try:
- request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name)
response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
except grpc.RpcError as e:
@@ -1233,7 +1246,7 @@ class _CASBatchRead():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchReadBlobsRequest()
+ self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name)
self._size = 0
self._sent = False
@@ -1280,7 +1293,7 @@ class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name)
self._size = 0
self._sent = False