summaryrefslogtreecommitdiff
path: root/src/buildstream/sandbox/_sandboxremote.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/sandbox/_sandboxremote.py')
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py146
1 files changed, 75 insertions, 71 deletions
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index 4308d662b..e9e145d1f 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -281,10 +281,11 @@ class SandboxRemote(Sandbox):
context = self._get_context()
cascache = context.get_cascache()
artifactcache = context.artifactcache
- casremote = CASRemote(self.storage_remote_spec, cascache)
- # Now do a pull to ensure we have the full directory structure.
- dir_digest = cascache.pull_tree(casremote, tree_digest)
+ with CASRemote(self.storage_remote_spec, cascache) as casremote:
+ # Now do a pull to ensure we have the full directory structure.
+ dir_digest = cascache.pull_tree(casremote, tree_digest)
+
if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
raise SandboxError("Output directory structure pulling from remote failed.")
@@ -300,7 +301,6 @@ class SandboxRemote(Sandbox):
project = self._get_project()
cascache = context.get_cascache()
artifactcache = context.artifactcache
- casremote = CASRemote(self.storage_remote_spec, cascache)
# Fetch the file blobs if needed
if self._output_files_required or artifactcache.has_push_remotes():
@@ -319,7 +319,9 @@ class SandboxRemote(Sandbox):
# artifact servers.
blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs)
- remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
+ with CASRemote(self.storage_remote_spec, cascache) as casremote:
+ remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
+
if remote_missing_blobs:
raise SandboxError("{} output files are missing on the CAS server"
.format(len(remote_missing_blobs)))
@@ -351,65 +353,66 @@ class SandboxRemote(Sandbox):
input_root_digest=input_root_digest)
action_digest = utils._message_digest(action.SerializeToString())
- # Next, try to create a communication channel to the BuildGrid server.
- url = urlparse(self.exec_url)
- if not url.port:
- raise SandboxError("You must supply a protocol and port number in the execution-service url, "
- "for example: http://buildservice:50051.")
- if url.scheme == 'http':
- channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
- elif url.scheme == 'https':
- channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials)
- else:
- raise SandboxError("Remote execution currently only supports the 'http' protocol "
- "and '{}' was supplied.".format(url.scheme))
-
# check action cache download and download if there
action_result = self._check_action_cache(action_digest)
if not action_result:
- casremote = CASRemote(self.storage_remote_spec, cascache)
- try:
- casremote.init()
- except grpc.RpcError as e:
- raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}"
- .format(self.storage_url, e)) from e
-
- # Determine blobs missing on remote
- try:
- missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)
- except grpc.RpcError as e:
- raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
-
- # Check if any blobs are also missing locally (partial artifact)
- # and pull them from the artifact cache.
- try:
- local_missing_blobs = cascache.local_missing_blobs(missing_blobs)
- if local_missing_blobs:
- artifactcache.fetch_missing_blobs(project, local_missing_blobs)
- except (grpc.RpcError, BstError) as e:
- raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
-
- # Now, push the missing blobs to the remote.
- try:
- cascache.send_blobs(casremote, missing_blobs)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
-
- # Push command and action
- try:
- casremote.push_message(command_proto)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push command to remote: {}".format(e))
-
- try:
- casremote.push_message(action)
- except grpc.RpcError as e:
- raise SandboxError("Failed to push action to remote: {}".format(e))
+ with CASRemote(self.storage_remote_spec, cascache) as casremote:
+ try:
+ casremote.init()
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}"
+ .format(self.storage_url, e)) from e
+
+ # Determine blobs missing on remote
+ try:
+ missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e
+
+ # Check if any blobs are also missing locally (partial artifact)
+ # and pull them from the artifact cache.
+ try:
+ local_missing_blobs = cascache.local_missing_blobs(missing_blobs)
+ if local_missing_blobs:
+ artifactcache.fetch_missing_blobs(project, local_missing_blobs)
+ except (grpc.RpcError, BstError) as e:
+ raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e
+
+ # Now, push the missing blobs to the remote.
+ try:
+ cascache.send_blobs(casremote, missing_blobs)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
+
+ # Push command and action
+ try:
+ casremote.push_message(command_proto)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push command to remote: {}".format(e))
+
+ try:
+ casremote.push_message(action)
+ except grpc.RpcError as e:
+ raise SandboxError("Failed to push action to remote: {}".format(e))
+
+ # Next, try to create a communication channel to the BuildGrid server.
+ url = urlparse(self.exec_url)
+ if not url.port:
+ raise SandboxError("You must supply a protocol and port number in the execution-service url, "
+ "for example: http://buildservice:50051.")
+ if url.scheme == 'http':
+ channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
+ elif url.scheme == 'https':
+ channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials)
+ else:
+ raise SandboxError("Remote execution currently only supports the 'http' protocol "
+ "and '{}' was supplied.".format(url.scheme))
# Now request to execute the action
- operation = self.run_remote_command(channel, action_digest)
- action_result = self._extract_action_result(operation)
+ with channel:
+ operation = self.run_remote_command(channel, action_digest)
+ action_result = self._extract_action_result(operation)
# Get output of build
self.process_job_output(action_result.output_directories, action_result.output_files,
@@ -445,20 +448,21 @@ class SandboxRemote(Sandbox):
elif url.scheme == 'https':
channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials)
- request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance,
- action_digest=action_digest)
- stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
- try:
- result = stub.GetActionResult(request)
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise SandboxError("Failed to query action cache: {} ({})"
- .format(e.code(), e.details()))
+ with channel:
+ request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance,
+ action_digest=action_digest)
+ stub = remote_execution_pb2_grpc.ActionCacheStub(channel)
+ try:
+ result = stub.GetActionResult(request)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SandboxError("Failed to query action cache: {} ({})"
+ .format(e.code(), e.details()))
+ else:
+ return None
else:
- return None
- else:
- self.info("Action result found in action cache")
- return result
+ self.info("Action result found in action cache")
+ return result
def _create_command(self, command, working_directory, environment):
# Creates a command proto