diff options
Diffstat (limited to 'src/buildstream/sandbox/_sandboxremote.py')
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 215 |
1 files changed, 117 insertions, 98 deletions
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index fa7cc9f90..d4ffd64a1 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -39,7 +39,7 @@ from .._cas import CASRemote from .._remote import RemoteSpec -class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')): +class RemoteExecutionSpec(namedtuple("RemoteExecutionSpec", "exec_service storage_service action_service")): pass @@ -49,59 +49,63 @@ class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storag # commands to a remote server and retrieves the results from it. # class SandboxRemote(SandboxREAPI): - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self._output_files_required = kwargs.get('output_files_required', True) + self._output_files_required = kwargs.get("output_files_required", True) - config = kwargs['specs'] # This should be a RemoteExecutionSpec + config = kwargs["specs"] # This should be a RemoteExecutionSpec if config is None: return # gRPC doesn't support fork without exec, which is used in the main process. assert not utils._is_main_process() - self.storage_url = config.storage_service['url'] - self.exec_url = config.exec_service['url'] + self.storage_url = config.storage_service["url"] + self.exec_url = config.exec_service["url"] exec_certs = {} - for key in ['client-cert', 'client-key', 'server-cert']: + for key in ["client-cert", "client-key", "server-cert"]: if key in config.exec_service: - with open(config.exec_service[key], 'rb') as f: + with open(config.exec_service[key], "rb") as f: exec_certs[key] = f.read() self.exec_credentials = grpc.ssl_channel_credentials( - root_certificates=exec_certs.get('server-cert'), - private_key=exec_certs.get('client-key'), - certificate_chain=exec_certs.get('client-cert')) + root_certificates=exec_certs.get("server-cert"), + private_key=exec_certs.get("client-key"), + certificate_chain=exec_certs.get("client-cert"), + ) action_certs = {} - for key in ['client-cert', 'client-key', 'server-cert']: + for key in ["client-cert", "client-key", "server-cert"]: if key in config.action_service: - with open(config.action_service[key], 'rb') as f: + with open(config.action_service[key], "rb") as f: action_certs[key] = f.read() if config.action_service: - self.action_url = config.action_service['url'] - self.action_instance = config.action_service.get('instance-name', None) + self.action_url = config.action_service["url"] + self.action_instance = config.action_service.get("instance-name", None) self.action_credentials = grpc.ssl_channel_credentials( - root_certificates=action_certs.get('server-cert'), - private_key=action_certs.get('client-key'), - certificate_chain=action_certs.get('client-cert')) + root_certificates=action_certs.get("server-cert"), + private_key=action_certs.get("client-key"), + certificate_chain=action_certs.get("client-cert"), + ) else: self.action_url = None self.action_instance = None self.action_credentials = None - self.exec_instance = config.exec_service.get('instance-name', None) - self.storage_instance = config.storage_service.get('instance-name', None) - - self.storage_remote_spec = RemoteSpec(self.storage_url, push=True, - server_cert=config.storage_service.get('server-cert'), - client_key=config.storage_service.get('client-key'), - client_cert=config.storage_service.get('client-cert'), - instance_name=self.storage_instance) + self.exec_instance = config.exec_service.get("instance-name", None) + self.storage_instance = config.storage_service.get("instance-name", None) + + self.storage_remote_spec = RemoteSpec( + self.storage_url, + push=True, + server_cert=config.storage_service.get("server-cert"), + client_key=config.storage_service.get("client-key"), + client_cert=config.storage_service.get("client-cert"), + instance_name=self.storage_instance, + ) self.operation_name = None def info(self, msg): @@ -109,47 +113,49 @@ class SandboxRemote(SandboxREAPI): @staticmethod def specs_from_config_node(config_node, basedir=None): - def require_node(config, keyname): val = config.get_mapping(keyname, default=None) if val is None: provenance = remote_config.get_provenance() - raise _yaml.LoadError("{}: '{}' was not present in the remote " - "execution configuration (remote-execution). " - .format(str(provenance), keyname), - _yaml.LoadErrorReason.INVALID_DATA) + raise _yaml.LoadError( + "{}: '{}' was not present in the remote " + "execution configuration (remote-execution). ".format(str(provenance), keyname), + _yaml.LoadErrorReason.INVALID_DATA, + ) return val - remote_config = config_node.get_mapping('remote-execution', default=None) + remote_config = config_node.get_mapping("remote-execution", default=None) if remote_config is None: return None - service_keys = ['execution-service', 'storage-service', 'action-cache-service'] + service_keys = ["execution-service", "storage-service", "action-cache-service"] - remote_config.validate_keys(['url', *service_keys]) + remote_config.validate_keys(["url", *service_keys]) - exec_config = require_node(remote_config, 'execution-service') - storage_config = require_node(remote_config, 'storage-service') - action_config = remote_config.get_mapping('action-cache-service', default={}) + exec_config = require_node(remote_config, "execution-service") + storage_config = require_node(remote_config, "storage-service") + action_config = remote_config.get_mapping("action-cache-service", default={}) - tls_keys = ['client-key', 'client-cert', 'server-cert'] + tls_keys = ["client-key", "client-cert", "server-cert"] - exec_config.validate_keys(['url', 'instance-name', *tls_keys]) - storage_config.validate_keys(['url', 'instance-name', *tls_keys]) + exec_config.validate_keys(["url", "instance-name", *tls_keys]) + storage_config.validate_keys(["url", "instance-name", *tls_keys]) if action_config: - action_config.validate_keys(['url', 'instance-name', *tls_keys]) + action_config.validate_keys(["url", "instance-name", *tls_keys]) # Maintain some backwards compatibility with older configs, in which # 'url' was the only valid key for remote-execution: - if 'url' in remote_config: - if 'execution-service' not in remote_config: - exec_config = Node.from_dict({'url': remote_config['url']}) + if "url" in remote_config: + if "execution-service" not in remote_config: + exec_config = Node.from_dict({"url": remote_config["url"]}) else: - provenance = remote_config.get_node('url').get_provenance() - raise _yaml.LoadError("{}: 'url' and 'execution-service' keys were found in the remote " - "execution configuration (remote-execution). " - "You can only specify one of these." - .format(str(provenance)), _yaml.LoadErrorReason.INVALID_DATA) + provenance = remote_config.get_node("url").get_provenance() + raise _yaml.LoadError( + "{}: 'url' and 'execution-service' keys were found in the remote " + "execution configuration (remote-execution). " + "You can only specify one of these.".format(str(provenance)), + _yaml.LoadErrorReason.INVALID_DATA, + ) service_configs = [exec_config, storage_config, action_config] @@ -161,12 +167,14 @@ class SandboxRemote(SandboxREAPI): for config_key, config in zip(service_keys, service_configs): # Either both or none of the TLS client key/cert pair must be specified: - if ('client-key' in config) != ('client-cert' in config): + if ("client-key" in config) != ("client-cert" in config): provenance = remote_config.get_node(config_key).get_provenance() - raise _yaml.LoadError("{}: TLS client key/cert pair is incomplete. " - "You must specify both 'client-key' and 'client-cert' " - "for authenticated HTTPS connections." - .format(str(provenance)), _yaml.LoadErrorReason.INVALID_DATA) + raise _yaml.LoadError( + "{}: TLS client key/cert pair is incomplete. " + "You must specify both 'client-key' and 'client-cert' " + "for authenticated HTTPS connections.".format(str(provenance)), + _yaml.LoadErrorReason.INVALID_DATA, + ) for tls_key in tls_keys: if tls_key in config: @@ -182,9 +190,9 @@ class SandboxRemote(SandboxREAPI): # Try to create a communication channel to the BuildGrid server. stub = remote_execution_pb2_grpc.ExecutionStub(channel) - request = remote_execution_pb2.ExecuteRequest(instance_name=self.exec_instance, - action_digest=action_digest, - skip_cache_lookup=False) + request = remote_execution_pb2.ExecuteRequest( + instance_name=self.exec_instance, action_digest=action_digest, skip_cache_lookup=False + ) def __run_remote_command(stub, execute_request=None, running_operation=None): try: @@ -206,26 +214,30 @@ class SandboxRemote(SandboxREAPI): except grpc.RpcError as e: status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: - raise SandboxError("Failed contacting remote execution server at {}." - .format(self.exec_url)) - - if status_code in (grpc.StatusCode.INVALID_ARGUMENT, - grpc.StatusCode.FAILED_PRECONDITION, - grpc.StatusCode.RESOURCE_EXHAUSTED, - grpc.StatusCode.INTERNAL, - grpc.StatusCode.DEADLINE_EXCEEDED): + raise SandboxError("Failed contacting remote execution server at {}.".format(self.exec_url)) + + if status_code in ( + grpc.StatusCode.INVALID_ARGUMENT, + grpc.StatusCode.FAILED_PRECONDITION, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.DEADLINE_EXCEEDED, + ): raise SandboxError("{} ({}).".format(e.details(), status_code.name)) if running_operation and status_code == grpc.StatusCode.UNIMPLEMENTED: - raise SandboxError("Failed trying to recover from connection loss: " - "server does not support operation status polling recovery.") + raise SandboxError( + "Failed trying to recover from connection loss: " + "server does not support operation status polling recovery." + ) return last_operation # Set up signal handler to trigger cancel_operation on SIGTERM operation = None - with self._get_context().messenger.timed_activity("Waiting for the remote build to complete"), \ - _signals.terminator(partial(self.cancel_operation, channel)): + with self._get_context().messenger.timed_activity( + "Waiting for the remote build to complete" + ), _signals.terminator(partial(self.cancel_operation, channel)): operation = __run_remote_command(stub, execute_request=request) if operation is None: return None @@ -242,18 +254,17 @@ class SandboxRemote(SandboxREAPI): return stub = operations_pb2_grpc.OperationsStub(channel) - request = operations_pb2.CancelOperationRequest( - name=str(self.operation_name)) + request = operations_pb2.CancelOperationRequest(name=str(self.operation_name)) try: stub.CancelOperation(request) except grpc.RpcError as e: - if (e.code() == grpc.StatusCode.UNIMPLEMENTED or - e.code() == grpc.StatusCode.INVALID_ARGUMENT): + if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT: pass else: - raise SandboxError("Failed trying to send CancelOperation request: " - "{} ({})".format(e.details(), e.code().name)) + raise SandboxError( + "Failed trying to send CancelOperation request: " "{} ({})".format(e.details(), e.code().name) + ) def _fetch_missing_blobs(self, vdir): context = self._get_context() @@ -282,8 +293,9 @@ class SandboxRemote(SandboxREAPI): remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) if remote_missing_blobs: - raise SandboxError("{} output files are missing on the CAS server" - .format(len(remote_missing_blobs))) + raise SandboxError( + "{} output files are missing on the CAS server".format(len(remote_missing_blobs)) + ) def _execute_action(self, action): context = self._get_context() @@ -301,8 +313,9 @@ class SandboxRemote(SandboxREAPI): try: casremote.init() except grpc.RpcError as e: - raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" - .format(self.storage_url, e)) from e + raise SandboxError( + "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_url, e) + ) from e # Determine blobs missing on remote try: @@ -333,15 +346,19 @@ class SandboxRemote(SandboxREAPI): # Next, try to create a communication channel to the BuildGrid server. url = urlparse(self.exec_url) if not url.port: - raise SandboxError("You must supply a protocol and port number in the execution-service url, " - "for example: http://buildservice:50051.") - if url.scheme == 'http': - channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) - elif url.scheme == 'https': - channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) + raise SandboxError( + "You must supply a protocol and port number in the execution-service url, " + "for example: http://buildservice:50051." + ) + if url.scheme == "http": + channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) + elif url.scheme == "https": + channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.exec_credentials) else: - raise SandboxError("Remote execution currently only supports the 'http' protocol " - "and '{}' was supplied.".format(url.scheme)) + raise SandboxError( + "Remote execution currently only supports the 'http' protocol " + "and '{}' was supplied.".format(url.scheme) + ) # Now request to execute the action with channel: @@ -369,23 +386,25 @@ class SandboxRemote(SandboxREAPI): return None url = urlparse(self.action_url) if not url.port: - raise SandboxError("You must supply a protocol and port number in the action-cache-service url, " - "for example: http://buildservice:50051.") - if url.scheme == 'http': - channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) - elif url.scheme == 'https': - channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials) + raise SandboxError( + "You must supply a protocol and port number in the action-cache-service url, " + "for example: http://buildservice:50051." + ) + if url.scheme == "http": + channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) + elif url.scheme == "https": + channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.action_credentials) with channel: - request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, - action_digest=action_digest) + request = remote_execution_pb2.GetActionResultRequest( + instance_name=self.action_instance, action_digest=action_digest + ) stub = remote_execution_pb2_grpc.ActionCacheStub(channel) try: result = stub.GetActionResult(request) except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: - raise SandboxError("Failed to query action cache: {} ({})" - .format(e.code(), e.details())) + raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details())) return None else: self.info("Action result found in action cache") @@ -397,7 +416,7 @@ class SandboxRemote(SandboxREAPI): # Failure of remote execution, usually due to an error in BuildStream raise SandboxError("No response returned from server") - assert not operation.HasField('error') and operation.HasField('response') + assert not operation.HasField("error") and operation.HasField("response") execution_response = remote_execution_pb2.ExecuteResponse() # The response is expected to be an ExecutionResponse message |