summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJim MacArthur <jim+gitlab@mode7.co.uk>2018-12-18 11:45:30 +0000
committerJim MacArthur <jim+gitlab@mode7.co.uk>2018-12-18 11:45:30 +0000
commit644d8b28505842eb713bf402b455f751b15b6022 (patch)
tree762c56da904f918f57c808d74d109cd317692987
parentb23bec551839e4d652a23a349fed96c885d7f7f5 (diff)
parent3dc209636b2ee09fb7e69a94af74f83ea6470524 (diff)
downloadbuildstream-644d8b28505842eb713bf402b455f751b15b6022.tar.gz
Merge branch 'raoul/627-RE-instance-config' into 'master'
Remote-execution instance configuration support Closes #627 See merge request BuildStream/buildstream!952
-rw-r--r--buildstream/_artifactcache/cascache.py49
-rw-r--r--buildstream/sandbox/_sandboxremote.py18
-rw-r--r--doc/source/format_project.rst11
3 files changed, 53 insertions, 25 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 9ca757d4d..9ba748d3a 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -45,7 +45,7 @@ from .. import _yaml
_MAX_PAYLOAD_BYTES = 1024 * 1024
-class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert')):
+class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key client_cert instance_name')):
# _new_from_config_node
#
@@ -53,7 +53,7 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
#
@staticmethod
def _new_from_config_node(spec_node, basedir=None):
- _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert'])
+ _yaml.node_validate(spec_node, ['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance_name'])
url = _yaml.node_get(spec_node, str, 'url')
push = _yaml.node_get(spec_node, bool, 'push', default_value=False)
if not url:
@@ -61,6 +61,8 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: empty artifact cache URL".format(provenance))
+ instance_name = _yaml.node_get(spec_node, str, 'instance_name', default_value=None)
+
server_cert = _yaml.node_get(spec_node, str, 'server-cert', default_value=None)
if server_cert and basedir:
server_cert = os.path.join(basedir, server_cert)
@@ -83,10 +85,10 @@ class CASRemoteSpec(namedtuple('CASRemoteSpec', 'url push server_cert client_key
raise LoadError(LoadErrorReason.INVALID_DATA,
"{}: 'client-cert' was specified without 'client-key'".format(provenance))
- return CASRemoteSpec(url, push, server_cert, client_key, client_cert)
+ return CASRemoteSpec(url, push, server_cert, client_key, client_cert, instance_name)
-CASRemoteSpec.__new__.__defaults__ = (None, None, None)
+CASRemoteSpec.__new__.__defaults__ = (None, None, None, None)
class BlobNotFound(CASError):
@@ -248,7 +250,7 @@ class CASCache():
remote = CASRemote(remote_spec)
remote.init()
- request = buildstream_pb2.StatusRequest()
+ request = buildstream_pb2.StatusRequest(instance_name=remote_spec.instance_name)
response = remote.ref_storage.Status(request)
if remote_spec.push and not response.allow_updates:
@@ -284,7 +286,7 @@ class CASCache():
try:
remote.init()
- request = buildstream_pb2.GetReferenceRequest()
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
@@ -369,7 +371,7 @@ class CASCache():
# Check whether ref is already on the server in which case
# there is no need to push the ref
try:
- request = buildstream_pb2.GetReferenceRequest()
+ request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
request.key = ref
response = remote.ref_storage.GetReference(request)
@@ -384,7 +386,7 @@ class CASCache():
self._send_directory(remote, tree)
- request = buildstream_pb2.UpdateReferenceRequest()
+ request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
@@ -448,7 +450,7 @@ class CASCache():
def verify_digest_on_remote(self, remote, digest):
remote.init()
- request = remote_execution_pb2.FindMissingBlobsRequest()
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
request.blob_digests.extend([digest])
response = remote.cas.FindMissingBlobs(request)
@@ -908,7 +910,13 @@ class CASCache():
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, stream):
- resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
+ resource_name_components = ['blobs', digest.hash, str(digest.size_bytes)]
+
+ if remote.spec.instance_name:
+ resource_name_components.insert(0, remote.spec.instance_name)
+
+ resource_name = '/'.join(resource_name_components)
+
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
@@ -1064,8 +1072,13 @@ class CASCache():
return dirdigest
def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
- resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
- digest.hash, str(digest.size_bytes)])
+ resource_name_components = ['uploads', str(u_uid), 'blobs',
+ digest.hash, str(digest.size_bytes)]
+
+ if remote.spec.instance_name:
+ resource_name_components.insert(0, remote.spec.instance_name)
+
+ resource_name = '/'.join(resource_name_components)
def request_stream(resname, instream):
offset = 0
@@ -1097,7 +1110,7 @@ class CASCache():
missing_blobs = dict()
# Limit size of FindMissingBlobs request
for required_blobs_group in _grouper(required_blobs, 512):
- request = remote_execution_pb2.FindMissingBlobsRequest()
+ request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
for required_digest in required_blobs_group:
d = request.blob_digests.add()
@@ -1193,7 +1206,7 @@ class CASRemote():
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
try:
- request = remote_execution_pb2.GetCapabilitiesRequest()
+ request = remote_execution_pb2.GetCapabilitiesRequest(instance_name=self.spec.instance_name)
response = self.capabilities.GetCapabilities(request)
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
@@ -1206,7 +1219,7 @@ class CASRemote():
# Check whether the server supports BatchReadBlobs()
self.batch_read_supported = False
try:
- request = remote_execution_pb2.BatchReadBlobsRequest()
+ request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=self.spec.instance_name)
response = self.cas.BatchReadBlobs(request)
self.batch_read_supported = True
except grpc.RpcError as e:
@@ -1216,7 +1229,7 @@ class CASRemote():
# Check whether the server supports BatchUpdateBlobs()
self.batch_update_supported = False
try:
- request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=self.spec.instance_name)
response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
except grpc.RpcError as e:
@@ -1233,7 +1246,7 @@ class _CASBatchRead():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchReadBlobsRequest()
+ self._request = remote_execution_pb2.BatchReadBlobsRequest(instance_name=remote.spec.instance_name)
self._size = 0
self._sent = False
@@ -1280,7 +1293,7 @@ class _CASBatchUpdate():
def __init__(self, remote):
self._remote = remote
self._max_total_size_bytes = remote.max_batch_total_size_bytes
- self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
+ self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name=remote.spec.instance_name)
self._size = 0
self._sent = False
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index 52e450fb5..503cf931a 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -61,15 +61,20 @@ class SandboxRemote(Sandbox):
self.storage_url = config.storage_service['url']
self.exec_url = config.exec_service['url']
+
if config.action_service:
self.action_url = config.action_service['url']
else:
self.action_url = None
+ self.server_instance = config.exec_service.get('instance', None)
+ self.storage_instance = config.storage_service.get('instance', None)
+
self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
server_cert=config.storage_service['server-cert'],
client_key=config.storage_service['client-key'],
- client_cert=config.storage_service['client-cert'])
+ client_cert=config.storage_service['client-cert'],
+ instance_name=self.storage_instance)
self.operation_name = None
def info(self, msg):
@@ -102,10 +107,10 @@ class SandboxRemote(Sandbox):
['execution-service', 'storage-service', 'url', 'action-cache-service'])
remote_exec_service_config = require_node(remote_config, 'execution-service')
remote_exec_storage_config = require_node(remote_config, 'storage-service')
- remote_exec_action_config = remote_config.get('action-cache-service')
+ remote_exec_action_config = remote_config.get('action-cache-service', {})
- _yaml.node_validate(remote_exec_service_config, ['url'])
- _yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)
+ _yaml.node_validate(remote_exec_service_config, ['url', 'instance'])
+ _yaml.node_validate(remote_exec_storage_config, ['url', 'instance'] + tls_keys)
if remote_exec_action_config:
_yaml.node_validate(remote_exec_action_config, ['url'])
else:
@@ -132,7 +137,7 @@ class SandboxRemote(Sandbox):
spec = RemoteExecutionSpec(remote_config['execution-service'],
remote_config['storage-service'],
- remote_config['action-cache-service'])
+ remote_exec_action_config)
return spec
def run_remote_command(self, channel, action_digest):
@@ -142,7 +147,8 @@ class SandboxRemote(Sandbox):
# Try to create a communication channel to the BuildGrid server.
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
- request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
+ request = remote_execution_pb2.ExecuteRequest(instance_name=self.server_instance,
+ action_digest=action_digest,
skip_cache_lookup=False)
def __run_remote_command(stub, execute_request=None, running_operation=None):
diff --git a/doc/source/format_project.rst b/doc/source/format_project.rst
index 4024cea19..bb66231cb 100644
--- a/doc/source/format_project.rst
+++ b/doc/source/format_project.rst
@@ -233,11 +233,13 @@ using the `remote-execution` option:
# A url defining a remote execution server
execution-service:
url: http://buildserver.example.com:50051
+ instance-name: development-emea-1
storage-service:
- - url: https://foo.com:11002/
+ url: https://foo.com:11002/
server-cert: server.crt
client-cert: client.crt
client-key: client.key
+ instance-name: development-emea-1
action-cache-service:
url: http://bar.action.com:50052
@@ -257,6 +259,13 @@ caching. Remote execution cannot work without push access to the
storage endpoint, so you must specify a client certificate and key,
and a server certificate.
+Instance name is optional. Instance names separate different shards on
+the same endpoint (url). You can supply a different instance name for
+`execution-service` and `storage-service`, if needed. The instance
+name should be given to you by the service provider of each
+service. Not all remote execution and storage services support
+instance names.
+
The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis.
.. _project_essentials_mirrors: