diff options
Diffstat (limited to 'src/buildstream/sandbox/_sandboxremote.py')
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 114 |
1 files changed, 26 insertions, 88 deletions
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 72b0f8f1a..308be2c3b 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -44,9 +44,7 @@ from .._cas import CASRemote from .._remote import RemoteSpec -class RemoteExecutionSpec( - namedtuple("RemoteExecutionSpec", "exec_service storage_service action_service") -): +class RemoteExecutionSpec(namedtuple("RemoteExecutionSpec", "exec_service storage_service action_service")): pass @@ -126,9 +124,7 @@ class SandboxRemote(Sandbox): provenance = remote_config.get_provenance() raise _yaml.LoadError( "{}: '{}' was not present in the remote " - "execution configuration (remote-execution). ".format( - str(provenance), keyname - ), + "execution configuration (remote-execution). ".format(str(provenance), keyname), _yaml.LoadErrorReason.INVALID_DATA, ) return val @@ -190,9 +186,7 @@ class SandboxRemote(Sandbox): config[tls_key] = resolve_path(config.get_str(tls_key)) # TODO: we should probably not be stripping node info and rather load files the safe way - return RemoteExecutionSpec( - *[conf.strip_node_info() for conf in service_configs] - ) + return RemoteExecutionSpec(*[conf.strip_node_info() for conf in service_configs]) def run_remote_command(self, channel, action_digest): # Sends an execution request to the remote execution server. @@ -202,9 +196,7 @@ class SandboxRemote(Sandbox): # Try to create a communication channel to the BuildGrid server. stub = remote_execution_pb2_grpc.ExecutionStub(channel) request = remote_execution_pb2.ExecuteRequest( - instance_name=self.exec_instance, - action_digest=action_digest, - skip_cache_lookup=False, + instance_name=self.exec_instance, action_digest=action_digest, skip_cache_lookup=False, ) def __run_remote_command(stub, execute_request=None, running_operation=None): @@ -213,9 +205,7 @@ class SandboxRemote(Sandbox): if execute_request is not None: operation_iterator = stub.Execute(execute_request) else: - request = remote_execution_pb2.WaitExecutionRequest( - name=running_operation.name - ) + request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name) operation_iterator = stub.WaitExecution(request) for operation in operation_iterator: @@ -229,11 +219,7 @@ class SandboxRemote(Sandbox): except grpc.RpcError as e: status_code = e.code() if status_code == grpc.StatusCode.UNAVAILABLE: - raise SandboxError( - "Failed contacting remote execution server at {}.".format( - self.exec_url - ) - ) + raise SandboxError("Failed contacting remote execution server at {}.".format(self.exec_url)) if status_code in ( grpc.StatusCode.INVALID_ARGUMENT, @@ -278,15 +264,11 @@ class SandboxRemote(Sandbox): try: stub.CancelOperation(request) except grpc.RpcError as e: - if ( - e.code() == grpc.StatusCode.UNIMPLEMENTED - or e.code() == grpc.StatusCode.INVALID_ARGUMENT - ): + if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT: pass else: raise SandboxError( - "Failed trying to send CancelOperation request: " - "{} ({})".format(e.details(), e.code().name) + "Failed trying to send CancelOperation request: " "{} ({})".format(e.details(), e.code().name) ) def process_job_output(self, output_directories, output_files, *, failure): @@ -304,9 +286,7 @@ class SandboxRemote(Sandbox): error_text = "No output directory was returned from the build server." raise SandboxError(error_text) if len(output_directories) > 1: - error_text = ( - "More than one output directory was returned from the build server: {}." - ) + error_text = "More than one output directory was returned from the build server: {}." raise SandboxError(error_text.format(output_directories)) tree_digest = output_directories[0].tree_digest @@ -352,20 +332,14 @@ class SandboxRemote(Sandbox): # however, artifact push remotes will need them. # Only fetch blobs that are missing on one or multiple # artifact servers. - blobs_to_fetch = artifactcache.find_missing_blobs( - project, local_missing_blobs - ) + blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs) with CASRemote(self.storage_remote_spec, cascache) as casremote: - remote_missing_blobs = cascache.fetch_blobs( - casremote, blobs_to_fetch - ) + remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) if remote_missing_blobs: raise SandboxError( - "{} output files are missing on the CAS server".format( - len(remote_missing_blobs) - ) + "{} output files are missing on the CAS server".format(len(remote_missing_blobs)) ) def _run(self, command, flags, *, cwd, env): @@ -391,9 +365,7 @@ class SandboxRemote(Sandbox): input_root_digest = upload_vdir._get_digest() command_proto = self._create_command(command, cwd, env) command_digest = utils._message_digest(command_proto.SerializeToString()) - action = remote_execution_pb2.Action( - command_digest=command_digest, input_root_digest=input_root_digest - ) + action = remote_execution_pb2.Action(command_digest=command_digest, input_root_digest=input_root_digest) action_digest = utils._message_digest(action.SerializeToString()) # check action cache download and download if there @@ -405,20 +377,14 @@ class SandboxRemote(Sandbox): casremote.init() except grpc.RpcError as e: raise SandboxError( - "Failed to contact remote execution CAS endpoint at {}: {}".format( - self.storage_url, e - ) + "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_url, e) ) from e # Determine blobs missing on remote try: - missing_blobs = cascache.remote_missing_blobs_for_directory( - casremote, input_root_digest - ) + missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) except grpc.RpcError as e: - raise SandboxError( - "Failed to determine missing blobs: {}".format(e) - ) from e + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e # Check if any blobs are also missing locally (partial artifact) # and pull them from the artifact cache. @@ -427,17 +393,13 @@ class SandboxRemote(Sandbox): if local_missing_blobs: artifactcache.fetch_missing_blobs(project, local_missing_blobs) except (grpc.RpcError, BstError) as e: - raise SandboxError( - "Failed to pull missing blobs from artifact cache: {}".format(e) - ) from e + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e # Now, push the missing blobs to the remote. try: cascache.send_blobs(casremote, missing_blobs) except grpc.RpcError as e: - raise SandboxError( - "Failed to push source directory to remote: {}".format(e) - ) from e + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e # Push command and action try: @@ -460,9 +422,7 @@ class SandboxRemote(Sandbox): if url.scheme == "http": channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) elif url.scheme == "https": - channel = grpc.secure_channel( - "{}:{}".format(url.hostname, url.port), self.exec_credentials - ) + channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.exec_credentials) else: raise SandboxError( "Remote execution currently only supports the 'http' protocol " @@ -476,9 +436,7 @@ class SandboxRemote(Sandbox): # Get output of build self.process_job_output( - action_result.output_directories, - action_result.output_files, - failure=action_result.exit_code != 0, + action_result.output_directories, action_result.output_files, failure=action_result.exit_code != 0, ) if stdout: @@ -511,9 +469,7 @@ class SandboxRemote(Sandbox): if url.scheme == "http": channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) elif url.scheme == "https": - channel = grpc.secure_channel( - "{}:{}".format(url.hostname, url.port), self.action_credentials - ) + channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.action_credentials) with channel: request = remote_execution_pb2.GetActionResultRequest( @@ -524,11 +480,7 @@ class SandboxRemote(Sandbox): result = stub.GetActionResult(request) except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: - raise SandboxError( - "Failed to query action cache: {} ({})".format( - e.code(), e.details() - ) - ) + raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details())) return None else: self.info("Action result found in action cache") @@ -537,8 +489,7 @@ class SandboxRemote(Sandbox): def _create_command(self, command, working_directory, environment): # Creates a command proto environment_variables = [ - remote_execution_pb2.Command.EnvironmentVariable(name=k, value=v) - for (k, v) in environment.items() + remote_execution_pb2.Command.EnvironmentVariable(name=k, value=v) for (k, v) in environment.items() ] # Request the whole directory tree as output @@ -604,16 +555,7 @@ class _SandboxRemoteBatch(_SandboxBatch): self.main_group.execute(self) first = self.first_command - if ( - first - and self.sandbox.run( - ["sh", "-c", "-e", self.script], - self.flags, - cwd=first.cwd, - env=first.env, - ) - != 0 - ): + if first and self.sandbox.run(["sh", "-c", "-e", self.script], self.flags, cwd=first.cwd, env=first.env,) != 0: raise SandboxCommandError("Command execution failed", collect=self.collect) def execute_group(self, group): @@ -650,11 +592,7 @@ class _SandboxRemoteBatch(_SandboxBatch): # Error handling label = command.label or cmdline quoted_label = shlex.quote("'{}'".format(label)) - self.script += " || (echo Command {} failed with exitcode $? >&2 ; exit 1)\n".format( - quoted_label - ) + self.script += " || (echo Command {} failed with exitcode $? >&2 ; exit 1)\n".format(quoted_label) def execute_call(self, call): - raise SandboxError( - "SandboxRemote does not support callbacks in command batches" - ) + raise SandboxError("SandboxRemote does not support callbacks in command batches") |