summaryrefslogtreecommitdiff
path: root/src/buildstream/sandbox/_sandboxremote.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/sandbox/_sandboxremote.py')
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py215
1 files changed, 117 insertions, 98 deletions
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index fa7cc9f90..d4ffd64a1 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -39,7 +39,7 @@ from .._cas import CASRemote
from .._remote import RemoteSpec
-class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
+class RemoteExecutionSpec(namedtuple("RemoteExecutionSpec", "exec_service storage_service action_service")):
pass
@@ -49,59 +49,63 @@ class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storag
# commands to a remote server and retrieves the results from it.
#
class SandboxRemote(SandboxREAPI):
-
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self._output_files_required = kwargs.get('output_files_required', True)
+ self._output_files_required = kwargs.get("output_files_required", True)
- config = kwargs['specs'] # This should be a RemoteExecutionSpec
+ config = kwargs["specs"] # This should be a RemoteExecutionSpec
if config is None:
return
# gRPC doesn't support fork without exec, which is used in the main process.
assert not utils._is_main_process()
- self.storage_url = config.storage_service['url']
- self.exec_url = config.exec_service['url']
+ self.storage_url = config.storage_service["url"]
+ self.exec_url = config.exec_service["url"]
exec_certs = {}
- for key in ['client-cert', 'client-key', 'server-cert']:
+ for key in ["client-cert", "client-key", "server-cert"]:
if key in config.exec_service:
- with open(config.exec_service[key], 'rb') as f:
+ with open(config.exec_service[key], "rb") as f:
exec_certs[key] = f.read()
self.exec_credentials = grpc.ssl_channel_credentials(
- root_certificates=exec_certs.get('server-cert'),
- private_key=exec_certs.get('client-key'),
- certificate_chain=exec_certs.get('client-cert'))
+ root_certificates=exec_certs.get("server-cert"),
+ private_key=exec_certs.get("client-key"),
+ certificate_chain=exec_certs.get("client-cert"),
+ )
action_certs = {}
- for key in ['client-cert', 'client-key', 'server-cert']:
+ for key in ["client-cert", "client-key", "server-cert"]:
if key in config.action_service:
- with open(config.action_service[key], 'rb') as f:
+ with open(config.action_service[key], "rb") as f:
action_certs[key] = f.read()
if config.action_service:
- self.action_url = config.action_service['url']
- self.action_instance = config.action_service.get('instance-name', None)
+ self.action_url = config.action_service["url"]
+ self.action_instance = config.action_service.get("instance-name", None)
self.action_credentials = grpc.ssl_channel_credentials(
- root_certificates=action_certs.get('server-cert'),
- private_key=action_certs.get('client-key'),
- certificate_chain=action_certs.get('client-cert'))
+ root_certificates=action_certs.get("server-cert"),
+ private_key=action_certs.get("client-key"),
+ certificate_chain=action_certs.get("client-cert"),
+ )
else:
self.action_url = None
self.action_instance = None
self.action_credentials = None
- self.exec_instance = config.exec_service.get('instance-name', None)
- self.storage_instance = config.storage_service.get('instance-name', None)
-
- self.storage_remote_spec = RemoteSpec(self.storage_url, push=True,
- server_cert=config.storage_service.get('server-cert'),
- client_key=config.storage_service.get('client-key'),
- client_cert=config.storage_service.get('client-cert'),
- instance_name=self.storage_instance)
+ self.exec_instance = config.exec_service.get("instance-name", None)
+ self.storage_instance = config.storage_service.get("instance-name", None)
+
+ self.storage_remote_spec = RemoteSpec(
+ self.storage_url,
+ push=True,
+ server_cert=config.storage_service.get("server-cert"),
+ client_key=config.storage_service.get("client-key"),
+ client_cert=config.storage_service.get("client-cert"),
+ instance_name=self.storage_instance,
+ )
self.operation_name = None
def info(self, msg):
@@ -109,47 +113,49 @@ class SandboxRemote(SandboxREAPI):
@staticmethod
def specs_from_config_node(config_node, basedir=None):
-
def require_node(config, keyname):
val = config.get_mapping(keyname, default=None)
if val is None:
provenance = remote_config.get_provenance()
- raise _yaml.LoadError("{}: '{}' was not present in the remote "
- "execution configuration (remote-execution). "
- .format(str(provenance), keyname),
- _yaml.LoadErrorReason.INVALID_DATA)
+ raise _yaml.LoadError(
+ "{}: '{}' was not present in the remote "
+ "execution configuration (remote-execution). ".format(str(provenance), keyname),
+ _yaml.LoadErrorReason.INVALID_DATA,
+ )
return val
- remote_config = config_node.get_mapping('remote-execution', default=None)
+ remote_config = config_node.get_mapping("remote-execution", default=None)
if remote_config is None:
return None
- service_keys = ['execution-service', 'storage-service', 'action-cache-service']
+ service_keys = ["execution-service", "storage-service", "action-cache-service"]
- remote_config.validate_keys(['url', *service_keys])
+ remote_config.validate_keys(["url", *service_keys])
- exec_config = require_node(remote_config, 'execution-service')
- storage_config = require_node(remote_config, 'storage-service')
- action_config = remote_config.get_mapping('action-cache-service', default={})
+ exec_config = require_node(remote_config, "execution-service")
+ storage_config = require_node(remote_config, "storage-service")
+ action_config = remote_config.get_mapping("action-cache-service", default={})
- tls_keys = ['client-key', 'client-cert', 'server-cert']
+ tls_keys = ["client-key", "client-cert", "server-cert"]
- exec_config.validate_keys(['url', 'instance-name', *tls_keys])
- storage_config.validate_keys(['url', 'instance-name', *tls_keys])
+ exec_config.validate_keys(["url", "instance-name", *tls_keys])
+ storage_config.validate_keys(["url", "instance-name", *tls_keys])
if action_config:
- action_config.validate_keys(['url', 'instance-name', *tls_keys])
+ action_config.validate_keys(["url", "instance-name", *tls_keys])
# Maintain some backwards compatibility with older configs, in which
# 'url' was the only valid key for remote-execution:
- if 'url' in remote_config:
- if 'execution-service' not in remote_config:
- exec_config = Node.from_dict({'url': remote_config['url']})
+ if "url" in remote_config:
+ if "execution-service" not in remote_config:
+ exec_config = Node.from_dict({"url": remote_config["url"]})
else:
- provenance = remote_config.get_node('url').get_provenance()
- raise _yaml.LoadError("{}: 'url' and 'execution-service' keys were found in the remote "
- "execution configuration (remote-execution). "
- "You can only specify one of these."
- .format(str(provenance)), _yaml.LoadErrorReason.INVALID_DATA)
+ provenance = remote_config.get_node("url").get_provenance()
+ raise _yaml.LoadError(
+ "{}: 'url' and 'execution-service' keys were found in the remote "
+ "execution configuration (remote-execution). "
+ "You can only specify one of these.".format(str(provenance)),
+ _yaml.LoadErrorReason.INVALID_DATA,
+ )
service_configs = [exec_config, storage_config, action_config]
@@ -161,12 +167,14 @@ class SandboxRemote(SandboxREAPI):
for config_key, config in zip(service_keys, service_configs):
# Either both or none of the TLS client key/cert pair must be specified:
- if ('client-key' in config) != ('client-cert' in config):
+ if ("client-key" in config) != ("client-cert" in config):
provenance = remote_config.get_node(config_key).get_provenance()
- raise _yaml.LoadError("{}: TLS client key/cert pair is incomplete. "
- "You must specify both 'client-key' and 'client-cert' "
- "for authenticated HTTPS connections."
- .format(str(provenance)), _yaml.LoadErrorReason.INVALID_DATA)
+ raise _yaml.LoadError(
+ "{}: TLS client key/cert pair is incomplete. "
+ "You must specify both 'client-key' and 'client-cert' "
+ "for authenticated HTTPS connections.".format(str(provenance)),
+ _yaml.LoadErrorReason.INVALID_DATA,
+ )
for tls_key in tls_keys:
if tls_key in config:
@@ -182,9 +190,9 @@ class SandboxRemote(SandboxREAPI):
# Try to create a communication channel to the BuildGrid server.
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
- request = remote_execution_pb2.ExecuteRequest(instance_name=self.exec_instance,
- action_digest=action_digest,
- skip_cache_lookup=False)
+ request = remote_execution_pb2.ExecuteRequest(
+ instance_name=self.exec_instance, action_digest=action_digest, skip_cache_lookup=False
+ )
def __run_remote_command(stub, execute_request=None, running_operation=None):
try:
@@ -206,26 +214,30 @@ class SandboxRemote(SandboxREAPI):
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNAVAILABLE:
- raise SandboxError("Failed contacting remote execution server at {}."
- .format(self.exec_url))
-
- if status_code in (grpc.StatusCode.INVALID_ARGUMENT,
- grpc.StatusCode.FAILED_PRECONDITION,
- grpc.StatusCode.RESOURCE_EXHAUSTED,
- grpc.StatusCode.INTERNAL,
- grpc.StatusCode.DEADLINE_EXCEEDED):
+ raise SandboxError("Failed contacting remote execution server at {}.".format(self.exec_url))
+
+ if status_code in (
+ grpc.StatusCode.INVALID_ARGUMENT,
+ grpc.StatusCode.FAILED_PRECONDITION,
+ grpc.StatusCode.RESOURCE_EXHAUSTED,
+ grpc.StatusCode.INTERNAL,
+ grpc.StatusCode.DEADLINE_EXCEEDED,
+ ):
raise SandboxError("{} ({}).".format(e.details(), status_code.name))
if running_operation and status_code == grpc.StatusCode.UNIMPLEMENTED:
- raise SandboxError("Failed trying to recover from connection loss: "
- "server does not support operation status polling recovery.")
+ raise SandboxError(
+ "Failed trying to recover from connection loss: "
+ "server does not support operation status polling recovery."
+ )
return last_operation
# Set up signal handler to trigger cancel_operation on SIGTERM
operation = None
- with self._get_context().messenger.timed_activity("Waiting for the remote build to complete"), \
- _signals.terminator(partial(self.cancel_operation, channel)):
+ with self._get_context().messenger.timed_activity(
+ "Waiting for the remote build to complete"
+ ), _signals.terminator(partial(self.cancel_operation, channel)):
operation = __run_remote_command(stub, execute_request=request)
if operation is None:
return None
@@ -242,18 +254,17 @@ class SandboxRemote(SandboxREAPI):
return
stub = operations_pb2_grpc.OperationsStub(channel)
- request = operations_pb2.CancelOperationRequest(
- name=str(self.operation_name))
+ request = operations_pb2.CancelOperationRequest(name=str(self.operation_name))
try:
stub.CancelOperation(request)
except grpc.RpcError as e:
- if (e.code() == grpc.StatusCode.UNIMPLEMENTED or
- e.code() == grpc.StatusCode.INVALID_ARGUMENT):
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT:
pass
else:
- raise SandboxError("Failed trying to send CancelOperation request: "
- "{} ({})".format(e.details(), e.code().name))
+ raise SandboxError(
+ "Failed trying to send CancelOperation request: " "{} ({})".format(e.details(), e.code().name)
+ )
def _fetch_missing_blobs(self, vdir):
context = self._get_context()
@@ -282,8 +293,9 @@ class SandboxRemote(SandboxREAPI):
remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
if remote_missing_blobs:
- raise SandboxError("{} output files are missing on the CAS server"
- .format(len(remote_missing_blobs)))
+ raise SandboxError(
+ "{} output files are missing on the CAS server".format(len(remote_missing_blobs))
+ )
def _execute_action(self, action):
context = self._get_context()
@@ -301,8 +313,9 @@ class SandboxRemote(SandboxREAPI):
try:
casremote.init()
except grpc.RpcError as e:
- raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}"
- .format(self.storage_url, e)) from e
+ raise SandboxError(
+ "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_url, e)
+ ) from e
# Determine blobs missing on remote
try:
@@ -333,15 +346,19 @@ class SandboxRemote(SandboxREAPI):
# Next, try to create a communication channel to the BuildGrid server.
url = urlparse(self.exec_url)
if not url.port:
- raise SandboxError("You must supply a protocol and port number in the execution-service url, "
- "for example: http://buildservice:50051.")
- if url.scheme == 'http':
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
- elif url.scheme == 'https':
- channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials)
+ raise SandboxError(
+ "You must supply a protocol and port number in the execution-service url, "
+ "for example: http://buildservice:50051."
+ )
+ if url.scheme == "http":
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
+ elif url.scheme == "https":
+ channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.exec_credentials)
else:
- raise SandboxError("Remote execution currently only supports the 'http' protocol "
- "and '{}' was supplied.".format(url.scheme))
+ raise SandboxError(
+ "Remote execution currently only supports the 'http' protocol "
+ "and '{}' was supplied.".format(url.scheme)
+ )
# Now request to execute the action
with channel:
@@ -369,23 +386,25 @@ class SandboxRemote(SandboxREAPI):
return None
url = urlparse(self.action_url)
if not url.port:
- raise SandboxError("You must supply a protocol and port number in the action-cache-service url, "
- "for example: http://buildservice:50051.")
- if url.scheme == 'http':
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
- elif url.scheme == 'https':
- channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials)
+ raise SandboxError(
+ "You must supply a protocol and port number in the action-cache-service url, "
+ "for example: http://buildservice:50051."
+ )
+ if url.scheme == "http":
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
+ elif url.scheme == "https":
+ channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.action_credentials)
with channel:
- request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance,
- action_digest=action_digest)
+ request = remote_execution_pb2.GetActionResultRequest(
+ instance_name=self.action_instance, action_digest=action_digest
+ )
stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
try:
result = stub.GetActionResult(request)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
- raise SandboxError("Failed to query action cache: {} ({})"
- .format(e.code(), e.details()))
+ raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details()))
return None
else:
self.info("Action result found in action cache")
@@ -397,7 +416,7 @@ class SandboxRemote(SandboxREAPI):
# Failure of remote execution, usually due to an error in BuildStream
raise SandboxError("No response returned from server")
- assert not operation.HasField('error') and operation.HasField('response')
+ assert not operation.HasField("error") and operation.HasField("response")
execution_response = remote_execution_pb2.ExecuteResponse()
# The response is expected to be an ExecutionResponse message