diff options
Diffstat (limited to 'src/buildstream/sandbox/_sandboxremote.py')
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 146 |
1 files changed, 75 insertions, 71 deletions
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 4308d662b..e9e145d1f 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -281,10 +281,11 @@ class SandboxRemote(Sandbox): context = self._get_context() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec, cascache) - # Now do a pull to ensure we have the full directory structure. - dir_digest = cascache.pull_tree(casremote, tree_digest) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + # Now do a pull to ensure we have the full directory structure. + dir_digest = cascache.pull_tree(casremote, tree_digest) + if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes: raise SandboxError("Output directory structure pulling from remote failed.") @@ -300,7 +301,6 @@ class SandboxRemote(Sandbox): project = self._get_project() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec, cascache) # Fetch the file blobs if needed if self._output_files_required or artifactcache.has_push_remotes(): @@ -319,7 +319,9 @@ class SandboxRemote(Sandbox): # artifact servers. blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs) - remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + if remote_missing_blobs: raise SandboxError("{} output files are missing on the CAS server" .format(len(remote_missing_blobs))) @@ -351,65 +353,66 @@ class SandboxRemote(Sandbox): input_root_digest=input_root_digest) action_digest = utils._message_digest(action.SerializeToString()) - # Next, try to create a communication channel to the BuildGrid server. - url = urlparse(self.exec_url) - if not url.port: - raise SandboxError("You must supply a protocol and port number in the execution-service url, " - "for example: http://buildservice:50051.") - if url.scheme == 'http': - channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) - elif url.scheme == 'https': - channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) - else: - raise SandboxError("Remote execution currently only supports the 'http' protocol " - "and '{}' was supplied.".format(url.scheme)) - # check action cache download and download if there action_result = self._check_action_cache(action_digest) if not action_result: - casremote = CASRemote(self.storage_remote_spec, cascache) - try: - casremote.init() - except grpc.RpcError as e: - raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" - .format(self.storage_url, e)) from e - - # Determine blobs missing on remote - try: - missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) - except grpc.RpcError as e: - raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - - # Check if any blobs are also missing locally (partial artifact) - # and pull them from the artifact cache. - try: - local_missing_blobs = cascache.local_missing_blobs(missing_blobs) - if local_missing_blobs: - artifactcache.fetch_missing_blobs(project, local_missing_blobs) - except (grpc.RpcError, BstError) as e: - raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e - - # Now, push the missing blobs to the remote. - try: - cascache.send_blobs(casremote, missing_blobs) - except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e - - # Push command and action - try: - casremote.push_message(command_proto) - except grpc.RpcError as e: - raise SandboxError("Failed to push command to remote: {}".format(e)) - - try: - casremote.push_message(action) - except grpc.RpcError as e: - raise SandboxError("Failed to push action to remote: {}".format(e)) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + try: + casremote.init() + except grpc.RpcError as e: + raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" + .format(self.storage_url, e)) from e + + # Determine blobs missing on remote + try: + missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) + except grpc.RpcError as e: + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e + + # Check if any blobs are also missing locally (partial artifact) + # and pull them from the artifact cache. + try: + local_missing_blobs = cascache.local_missing_blobs(missing_blobs) + if local_missing_blobs: + artifactcache.fetch_missing_blobs(project, local_missing_blobs) + except (grpc.RpcError, BstError) as e: + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e + + # Now, push the missing blobs to the remote. + try: + cascache.send_blobs(casremote, missing_blobs) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + + # Push command and action + try: + casremote.push_message(command_proto) + except grpc.RpcError as e: + raise SandboxError("Failed to push command to remote: {}".format(e)) + + try: + casremote.push_message(action) + except grpc.RpcError as e: + raise SandboxError("Failed to push action to remote: {}".format(e)) + + # Next, try to create a communication channel to the BuildGrid server. + url = urlparse(self.exec_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the execution-service url, " + "for example: http://buildservice:50051.") + if url.scheme == 'http': + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + elif url.scheme == 'https': + channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) + else: + raise SandboxError("Remote execution currently only supports the 'http' protocol " + "and '{}' was supplied.".format(url.scheme)) # Now request to execute the action - operation = self.run_remote_command(channel, action_digest) - action_result = self._extract_action_result(operation) + with channel: + operation = self.run_remote_command(channel, action_digest) + action_result = self._extract_action_result(operation) # Get output of build self.process_job_output(action_result.output_directories, action_result.output_files, @@ -445,20 +448,21 @@ class SandboxRemote(Sandbox): elif url.scheme == 'https': channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials) - request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, - action_digest=action_digest) - stub = remote_execution_pb2_grpc.ActionCacheStub(channel) - try: - result = stub.GetActionResult(request) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise SandboxError("Failed to query action cache: {} ({})" - .format(e.code(), e.details())) + with channel: + request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, + action_digest=action_digest) + stub = remote_execution_pb2_grpc.ActionCacheStub(channel) + try: + result = stub.GetActionResult(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})" + .format(e.code(), e.details())) + else: + return None else: - return None - else: - self.info("Action result found in action cache") - return result + self.info("Action result found in action cache") + return result def _create_command(self, command, working_directory, environment): # Creates a command proto |