summaryrefslogtreecommitdiff
path: root/src/buildstream/sandbox/_sandboxremote.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/sandbox/_sandboxremote.py')
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py114
1 files changed, 26 insertions, 88 deletions
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index 72b0f8f1a..308be2c3b 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -44,9 +44,7 @@ from .._cas import CASRemote
from .._remote import RemoteSpec
-class RemoteExecutionSpec(
- namedtuple("RemoteExecutionSpec", "exec_service storage_service action_service")
-):
+class RemoteExecutionSpec(namedtuple("RemoteExecutionSpec", "exec_service storage_service action_service")):
pass
@@ -126,9 +124,7 @@ class SandboxRemote(Sandbox):
provenance = remote_config.get_provenance()
raise _yaml.LoadError(
"{}: '{}' was not present in the remote "
- "execution configuration (remote-execution). ".format(
- str(provenance), keyname
- ),
+ "execution configuration (remote-execution). ".format(str(provenance), keyname),
_yaml.LoadErrorReason.INVALID_DATA,
)
return val
@@ -190,9 +186,7 @@ class SandboxRemote(Sandbox):
config[tls_key] = resolve_path(config.get_str(tls_key))
# TODO: we should probably not be stripping node info and rather load files the safe way
- return RemoteExecutionSpec(
- *[conf.strip_node_info() for conf in service_configs]
- )
+ return RemoteExecutionSpec(*[conf.strip_node_info() for conf in service_configs])
def run_remote_command(self, channel, action_digest):
# Sends an execution request to the remote execution server.
@@ -202,9 +196,7 @@ class SandboxRemote(Sandbox):
# Try to create a communication channel to the BuildGrid server.
stub = remote_execution_pb2_grpc.ExecutionStub(channel)
request = remote_execution_pb2.ExecuteRequest(
- instance_name=self.exec_instance,
- action_digest=action_digest,
- skip_cache_lookup=False,
+ instance_name=self.exec_instance, action_digest=action_digest, skip_cache_lookup=False,
)
def __run_remote_command(stub, execute_request=None, running_operation=None):
@@ -213,9 +205,7 @@ class SandboxRemote(Sandbox):
if execute_request is not None:
operation_iterator = stub.Execute(execute_request)
else:
- request = remote_execution_pb2.WaitExecutionRequest(
- name=running_operation.name
- )
+ request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
operation_iterator = stub.WaitExecution(request)
for operation in operation_iterator:
@@ -229,11 +219,7 @@ class SandboxRemote(Sandbox):
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNAVAILABLE:
- raise SandboxError(
- "Failed contacting remote execution server at {}.".format(
- self.exec_url
- )
- )
+ raise SandboxError("Failed contacting remote execution server at {}.".format(self.exec_url))
if status_code in (
grpc.StatusCode.INVALID_ARGUMENT,
@@ -278,15 +264,11 @@ class SandboxRemote(Sandbox):
try:
stub.CancelOperation(request)
except grpc.RpcError as e:
- if (
- e.code() == grpc.StatusCode.UNIMPLEMENTED
- or e.code() == grpc.StatusCode.INVALID_ARGUMENT
- ):
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED or e.code() == grpc.StatusCode.INVALID_ARGUMENT:
pass
else:
raise SandboxError(
- "Failed trying to send CancelOperation request: "
- "{} ({})".format(e.details(), e.code().name)
+ "Failed trying to send CancelOperation request: " "{} ({})".format(e.details(), e.code().name)
)
def process_job_output(self, output_directories, output_files, *, failure):
@@ -304,9 +286,7 @@ class SandboxRemote(Sandbox):
error_text = "No output directory was returned from the build server."
raise SandboxError(error_text)
if len(output_directories) > 1:
- error_text = (
- "More than one output directory was returned from the build server: {}."
- )
+ error_text = "More than one output directory was returned from the build server: {}."
raise SandboxError(error_text.format(output_directories))
tree_digest = output_directories[0].tree_digest
@@ -352,20 +332,14 @@ class SandboxRemote(Sandbox):
# however, artifact push remotes will need them.
# Only fetch blobs that are missing on one or multiple
# artifact servers.
- blobs_to_fetch = artifactcache.find_missing_blobs(
- project, local_missing_blobs
- )
+ blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs)
with CASRemote(self.storage_remote_spec, cascache) as casremote:
- remote_missing_blobs = cascache.fetch_blobs(
- casremote, blobs_to_fetch
- )
+ remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
if remote_missing_blobs:
raise SandboxError(
- "{} output files are missing on the CAS server".format(
- len(remote_missing_blobs)
- )
+ "{} output files are missing on the CAS server".format(len(remote_missing_blobs))
)
def _run(self, command, flags, *, cwd, env):
@@ -391,9 +365,7 @@ class SandboxRemote(Sandbox):
input_root_digest = upload_vdir._get_digest()
command_proto = self._create_command(command, cwd, env)
command_digest = utils._message_digest(command_proto.SerializeToString())
- action = remote_execution_pb2.Action(
- command_digest=command_digest, input_root_digest=input_root_digest
- )
+ action = remote_execution_pb2.Action(command_digest=command_digest, input_root_digest=input_root_digest)
action_digest = utils._message_digest(action.SerializeToString())
# check action cache download and download if there
@@ -405,20 +377,14 @@ class SandboxRemote(Sandbox):
casremote.init()
except grpc.RpcError as e:
raise SandboxError(
- "Failed to contact remote execution CAS endpoint at {}: {}".format(
- self.storage_url, e
- )
+ "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_url, e)
) from e
# Determine blobs missing on remote
try:
- missing_blobs = cascache.remote_missing_blobs_for_directory(
- casremote, input_root_digest
- )
+ missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)
except grpc.RpcError as e:
- raise SandboxError(
- "Failed to determine missing blobs: {}".format(e)
- ) from e
+ raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
# Check if any blobs are also missing locally (partial artifact)
# and pull them from the artifact cache.
@@ -427,17 +393,13 @@ class SandboxRemote(Sandbox):
if local_missing_blobs:
artifactcache.fetch_missing_blobs(project, local_missing_blobs)
except (grpc.RpcError, BstError) as e:
- raise SandboxError(
- "Failed to pull missing blobs from artifact cache: {}".format(e)
- ) from e
+ raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
# Now, push the missing blobs to the remote.
try:
cascache.send_blobs(casremote, missing_blobs)
except grpc.RpcError as e:
- raise SandboxError(
- "Failed to push source directory to remote: {}".format(e)
- ) from e
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
# Push command and action
try:
@@ -460,9 +422,7 @@ class SandboxRemote(Sandbox):
if url.scheme == "http":
channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
elif url.scheme == "https":
- channel = grpc.secure_channel(
- "{}:{}".format(url.hostname, url.port), self.exec_credentials
- )
+ channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.exec_credentials)
else:
raise SandboxError(
"Remote execution currently only supports the 'http' protocol "
@@ -476,9 +436,7 @@ class SandboxRemote(Sandbox):
# Get output of build
self.process_job_output(
- action_result.output_directories,
- action_result.output_files,
- failure=action_result.exit_code != 0,
+ action_result.output_directories, action_result.output_files, failure=action_result.exit_code != 0,
)
if stdout:
@@ -511,9 +469,7 @@ class SandboxRemote(Sandbox):
if url.scheme == "http":
channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
elif url.scheme == "https":
- channel = grpc.secure_channel(
- "{}:{}".format(url.hostname, url.port), self.action_credentials
- )
+ channel = grpc.secure_channel("{}:{}".format(url.hostname, url.port), self.action_credentials)
with channel:
request = remote_execution_pb2.GetActionResultRequest(
@@ -524,11 +480,7 @@ class SandboxRemote(Sandbox):
result = stub.GetActionResult(request)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
- raise SandboxError(
- "Failed to query action cache: {} ({})".format(
- e.code(), e.details()
- )
- )
+ raise SandboxError("Failed to query action cache: {} ({})".format(e.code(), e.details()))
return None
else:
self.info("Action result found in action cache")
@@ -537,8 +489,7 @@ class SandboxRemote(Sandbox):
def _create_command(self, command, working_directory, environment):
# Creates a command proto
environment_variables = [
- remote_execution_pb2.Command.EnvironmentVariable(name=k, value=v)
- for (k, v) in environment.items()
+ remote_execution_pb2.Command.EnvironmentVariable(name=k, value=v) for (k, v) in environment.items()
]
# Request the whole directory tree as output
@@ -604,16 +555,7 @@ class _SandboxRemoteBatch(_SandboxBatch):
self.main_group.execute(self)
first = self.first_command
- if (
- first
- and self.sandbox.run(
- ["sh", "-c", "-e", self.script],
- self.flags,
- cwd=first.cwd,
- env=first.env,
- )
- != 0
- ):
+ if first and self.sandbox.run(["sh", "-c", "-e", self.script], self.flags, cwd=first.cwd, env=first.env,) != 0:
raise SandboxCommandError("Command execution failed", collect=self.collect)
def execute_group(self, group):
@@ -650,11 +592,7 @@ class _SandboxRemoteBatch(_SandboxBatch):
# Error handling
label = command.label or cmdline
quoted_label = shlex.quote("'{}'".format(label))
- self.script += " || (echo Command {} failed with exitcode $? >&2 ; exit 1)\n".format(
- quoted_label
- )
+ self.script += " || (echo Command {} failed with exitcode $? >&2 ; exit 1)\n".format(quoted_label)
def execute_call(self, call):
- raise SandboxError(
- "SandboxRemote does not support callbacks in command batches"
- )
+ raise SandboxError("SandboxRemote does not support callbacks in command batches")