diff options
Diffstat (limited to 'src/buildstream/sandbox/_sandboxremote.py')
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py new file mode 100644 index 000000000..2cb7e2538 --- /dev/null +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -0,0 +1,577 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Bloomberg LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Jim MacArthur <jim.macarthur@codethink.co.uk> + +import os +import shlex +from collections import namedtuple +from urllib.parse import urlparse +from functools import partial + +import grpc + +from .. import utils +from .._message import Message, MessageType +from .sandbox import Sandbox, SandboxCommandError, _SandboxBatch +from ..storage.directory import VirtualDirectoryError +from ..storage._casbaseddirectory import CasBasedDirectory +from .. import _signals +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.google.rpc import code_pb2 +from .._exceptions import BstError, SandboxError +from .. import _yaml +from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc +from .._cas import CASRemote, CASRemoteSpec + + +class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')): + pass + + +# SandboxRemote() +# +# This isn't really a sandbox, it's a stub which sends all the sources and build +# commands to a remote server and retrieves the results from it. +# +class SandboxRemote(Sandbox): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._output_files_required = kwargs.get('output_files_required', True) + + config = kwargs['specs'] # This should be a RemoteExecutionSpec + if config is None: + return + + self.storage_url = config.storage_service['url'] + self.exec_url = config.exec_service['url'] + + exec_certs = {} + for key in ['client-cert', 'client-key', 'server-cert']: + if key in config.exec_service: + with open(config.exec_service[key], 'rb') as f: + exec_certs[key] = f.read() + + self.exec_credentials = grpc.ssl_channel_credentials( + root_certificates=exec_certs.get('server-cert'), + private_key=exec_certs.get('client-key'), + certificate_chain=exec_certs.get('client-cert')) + + action_certs = {} + for key in ['client-cert', 'client-key', 'server-cert']: + if key in config.action_service: + with open(config.action_service[key], 'rb') as f: + action_certs[key] = f.read() + + if config.action_service: + self.action_url = config.action_service['url'] + self.action_instance = config.action_service.get('instance-name', None) + self.action_credentials = grpc.ssl_channel_credentials( + root_certificates=action_certs.get('server-cert'), + private_key=action_certs.get('client-key'), + certificate_chain=action_certs.get('client-cert')) + else: + self.action_url = None + self.action_instance = None + self.action_credentials = None + + self.exec_instance = config.exec_service.get('instance-name', None) + self.storage_instance = config.storage_service.get('instance-name', None) + + self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True, + server_cert=config.storage_service.get('server-cert'), + client_key=config.storage_service.get('client-key'), + client_cert=config.storage_service.get('client-cert'), + instance_name=self.storage_instance) + self.operation_name = None + + def info(self, msg): + self._get_context().message(Message(None, MessageType.INFO, msg)) + + @staticmethod + def specs_from_config_node(config_node, basedir=None): + + def require_node(config, keyname): + val = _yaml.node_get(config, dict, keyname, default_value=None) + if val is None: + provenance = _yaml.node_get_provenance(remote_config, key=keyname) + raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA, + "{}: '{}' was not present in the remote " + "execution configuration (remote-execution). " + .format(str(provenance), keyname)) + return val + + remote_config = _yaml.node_get(config_node, dict, 'remote-execution', default_value=None) + if remote_config is None: + return None + + service_keys = ['execution-service', 'storage-service', 'action-cache-service'] + + _yaml.node_validate(remote_config, ['url', *service_keys]) + + exec_config = require_node(remote_config, 'execution-service') + storage_config = require_node(remote_config, 'storage-service') + action_config = _yaml.node_get(remote_config, dict, 'action-cache-service', default_value={}) + + tls_keys = ['client-key', 'client-cert', 'server-cert'] + + _yaml.node_validate(exec_config, ['url', 'instance-name', *tls_keys]) + _yaml.node_validate(storage_config, ['url', 'instance-name', *tls_keys]) + if action_config: + _yaml.node_validate(action_config, ['url', 'instance-name', *tls_keys]) + + # Maintain some backwards compatibility with older configs, in which + # 'url' was the only valid key for remote-execution: + if 'url' in remote_config: + if 'execution-service' not in remote_config: + exec_config = _yaml.new_node_from_dict({'url': remote_config['url']}) + else: + provenance = _yaml.node_get_provenance(remote_config, key='url') + raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA, + "{}: 'url' and 'execution-service' keys were found in the remote " + "execution configuration (remote-execution). " + "You can only specify one of these." + .format(str(provenance))) + + service_configs = [exec_config, storage_config, action_config] + + def resolve_path(path): + if basedir and path: + return os.path.join(basedir, path) + else: + return path + + for config_key, config in zip(service_keys, service_configs): + # Either both or none of the TLS client key/cert pair must be specified: + if ('client-key' in config) != ('client-cert' in config): + provenance = _yaml.node_get_provenance(remote_config, key=config_key) + raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA, + "{}: TLS client key/cert pair is incomplete. " + "You must specify both 'client-key' and 'client-cert' " + "for authenticated HTTPS connections." + .format(str(provenance))) + + for tls_key in tls_keys: + if tls_key in config: + _yaml.node_set(config, tls_key, resolve_path(_yaml.node_get(config, str, tls_key))) + + return RemoteExecutionSpec(*[_yaml.node_sanitize(conf) for conf in service_configs]) + + def run_remote_command(self, channel, action_digest): + # Sends an execution request to the remote execution server. + # + # This function blocks until it gets a response from the server. + + # Try to create a communication channel to the BuildGrid server. + stub = remote_execution_pb2_grpc.ExecutionStub(channel) + request = remote_execution_pb2.ExecuteRequest(instance_name=self.exec_instance, + action_digest=action_digest, + skip_cache_lookup=False) + + def __run_remote_command(stub, execute_request=None, running_operation=None): + try: + last_operation = None + if execute_request is not None: + operation_iterator = stub.Execute(execute_request) + else: + request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name) + operation_iterator = stub.WaitExecution(request) + + for operation in operation_iterator: + if not self.operation_name: + self.operation_name = operation.name + if operation.done: + return operation + else: + last_operation = operation + + except grpc.RpcError as e: + status_code = e.code() + if status_code == grpc.StatusCode.UNAVAILABLE: + raise SandboxError("Failed contacting remote execution server at {}." + .format(self.exec_url)) + + elif status_code in (grpc.StatusCode.INVALID_ARGUMENT, + grpc.StatusCode.FAILED_PRECONDITION, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.DEADLINE_EXCEEDED): + raise SandboxError("{} ({}).".format(e.details(), status_code.name)) + + elif running_operation and status_code == grpc.StatusCode.UNIMPLEMENTED: + raise SandboxError("Failed trying to recover from connection loss: " + "server does not support operation status polling recovery.") + + return last_operation + + # Set up signal handler to trigger cancel_operation on SIGTERM + operation = None + with self._get_context().timed_activity("Waiting for the remote build to complete"), \ + _signals.terminator(partial(self.cancel_operation, channel)): + operation = __run_remote_command(stub, execute_request=request) + if operation is None: + return None + elif operation.done: + return operation + while operation is not None and not operation.done: + operation = __run_remote_command(stub, running_operation=operation) + + return operation + + def cancel_operation(self, channel): + # If we don't have the name can't send request. + if self.operation_name is None: + return + + stub = operations_pb2_grpc.OperationsStub(channel) + request = operations_pb2.CancelOperationRequest( + name=str(self.operation_name)) + + try: + stub.CancelOperation(request) + except grpc.RpcError as e: + if (e.code() == grpc.StatusCode.UNIMPLEMENTED or + e.code() == grpc.StatusCode.INVALID_ARGUMENT): + pass + else: + raise SandboxError("Failed trying to send CancelOperation request: " + "{} ({})".format(e.details(), e.code().name)) + + def process_job_output(self, output_directories, output_files, *, failure): + # Reads the remote execution server response to an execution request. + # + # output_directories is an array of OutputDirectory objects. + # output_files is an array of OutputFile objects. + # + # We only specify one output_directory, so it's an error + # for there to be any output files or more than one directory at the moment. + # + if output_files: + raise SandboxError("Output files were returned when we didn't request any.") + elif not output_directories: + error_text = "No output directory was returned from the build server." + raise SandboxError(error_text) + elif len(output_directories) > 1: + error_text = "More than one output directory was returned from the build server: {}." + raise SandboxError(error_text.format(output_directories)) + + tree_digest = output_directories[0].tree_digest + if tree_digest is None or not tree_digest.hash: + raise SandboxError("Output directory structure had no digest attached.") + + context = self._get_context() + project = self._get_project() + cascache = context.get_cascache() + artifactcache = context.artifactcache + casremote = CASRemote(self.storage_remote_spec) + + # Now do a pull to ensure we have the full directory structure. + dir_digest = cascache.pull_tree(casremote, tree_digest) + if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes: + raise SandboxError("Output directory structure pulling from remote failed.") + + # At the moment, we will get the whole directory back in the first directory argument and we need + # to replace the sandbox's virtual directory with that. Creating a new virtual directory object + # from another hash will be interesting, though... + + new_dir = CasBasedDirectory(context.artifactcache.cas, digest=dir_digest) + self._set_virtual_directory(new_dir) + + # Fetch the file blobs if needed + if self._output_files_required or artifactcache.has_push_remotes(): + required_blobs = [] + directories = [] + + directories.append(self._output_directory) + if self._build_directory and (self._build_directory_always or failure): + directories.append(self._build_directory) + + for directory in directories: + try: + vdir = new_dir.descend(*directory.strip(os.sep).split(os.sep)) + dir_digest = vdir._get_digest() + required_blobs += cascache.required_blobs_for_directory(dir_digest) + except VirtualDirectoryError: + # If the directory does not exist, there is no need to + # download file blobs. + pass + + local_missing_blobs = cascache.local_missing_blobs(required_blobs) + if local_missing_blobs: + if self._output_files_required: + # Fetch all blobs from Remote Execution CAS server + blobs_to_fetch = local_missing_blobs + else: + # Output files are not required in the local cache, + # however, artifact push remotes will need them. + # Only fetch blobs that are missing on one or multiple + # artifact servers. + blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs) + + remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + if remote_missing_blobs: + raise SandboxError("{} output files are missing on the CAS server" + .format(len(remote_missing_blobs))) + + def _run(self, command, flags, *, cwd, env): + stdout, stderr = self._get_output() + + context = self._get_context() + project = self._get_project() + cascache = context.get_cascache() + artifactcache = context.artifactcache + + # set up virtual dircetory + upload_vdir = self.get_virtual_directory() + + # Create directories for all marked directories. This emulates + # some of the behaviour of other sandboxes, which create these + # to use as mount points. + for mark in self._get_marked_directories(): + directory = mark['directory'] + # Create each marked directory + upload_vdir.descend(*directory.split(os.path.sep), create=True) + + # Generate action_digest first + input_root_digest = upload_vdir._get_digest() + command_proto = self._create_command(command, cwd, env) + command_digest = utils._message_digest(command_proto.SerializeToString()) + action = remote_execution_pb2.Action(command_digest=command_digest, + input_root_digest=input_root_digest) + action_digest = utils._message_digest(action.SerializeToString()) + + # Next, try to create a communication channel to the BuildGrid server. + url = urlparse(self.exec_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the execution-service url, " + "for example: http://buildservice:50051.") + if url.scheme == 'http': + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + elif url.scheme == 'https': + channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) + else: + raise SandboxError("Remote execution currently only supports the 'http' protocol " + "and '{}' was supplied.".format(url.scheme)) + + # check action cache download and download if there + action_result = self._check_action_cache(action_digest) + + if not action_result: + casremote = CASRemote(self.storage_remote_spec) + try: + casremote.init() + except grpc.RpcError as e: + raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" + .format(self.storage_url, e)) from e + + # Determine blobs missing on remote + try: + missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) + except grpc.RpcError as e: + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e + + # Check if any blobs are also missing locally (partial artifact) + # and pull them from the artifact cache. + try: + local_missing_blobs = cascache.local_missing_blobs(missing_blobs) + if local_missing_blobs: + artifactcache.fetch_missing_blobs(project, local_missing_blobs) + except (grpc.RpcError, BstError) as e: + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e + + # Now, push the missing blobs to the remote. + try: + cascache.send_blobs(casremote, missing_blobs) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + + # Push command and action + try: + casremote.push_message(command_proto) + except grpc.RpcError as e: + raise SandboxError("Failed to push command to remote: {}".format(e)) + + try: + casremote.push_message(action) + except grpc.RpcError as e: + raise SandboxError("Failed to push action to remote: {}".format(e)) + + # Now request to execute the action + operation = self.run_remote_command(channel, action_digest) + action_result = self._extract_action_result(operation) + + # Get output of build + self.process_job_output(action_result.output_directories, action_result.output_files, + failure=action_result.exit_code != 0) + + if stdout: + if action_result.stdout_raw: + stdout.write(str(action_result.stdout_raw, 'utf-8', errors='ignore')) + if stderr: + if action_result.stderr_raw: + stderr.write(str(action_result.stderr_raw, 'utf-8', errors='ignore')) + + if action_result.exit_code != 0: + # A normal error during the build: the remote execution system + # has worked correctly but the command failed. + return action_result.exit_code + + return 0 + + def _check_action_cache(self, action_digest): + # Checks the action cache to see if this artifact has already been built + # + # Should return either the action response or None if not found, raise + # Sandboxerror if other grpc error was raised + if not self.action_url: + return None + url = urlparse(self.action_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the action-cache-service url, " + "for example: http://buildservice:50051.") + if url.scheme == 'http': + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + elif url.scheme == 'https': + channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials) + + request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, + action_digest=action_digest) + stub = remote_execution_pb2_grpc.ActionCacheStub(channel) + try: + result = stub.GetActionResult(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})" + .format(e.code(), e.details())) + else: + return None + else: + self.info("Action result found in action cache") + return result + + def _create_command(self, command, working_directory, environment): + # Creates a command proto + environment_variables = [remote_execution_pb2.Command. + EnvironmentVariable(name=k, value=v) + for (k, v) in environment.items()] + + # Request the whole directory tree as output + output_directory = os.path.relpath(os.path.sep, start=working_directory) + + return remote_execution_pb2.Command(arguments=command, + working_directory=working_directory, + environment_variables=environment_variables, + output_files=[], + output_directories=[output_directory], + platform=None) + + @staticmethod + def _extract_action_result(operation): + if operation is None: + # Failure of remote execution, usually due to an error in BuildStream + raise SandboxError("No response returned from server") + + assert not operation.HasField('error') and operation.HasField('response') + + execution_response = remote_execution_pb2.ExecuteResponse() + # The response is expected to be an ExecutionResponse message + assert operation.response.Is(execution_response.DESCRIPTOR) + + operation.response.Unpack(execution_response) + + if execution_response.status.code != code_pb2.OK: + # An unexpected error during execution: the remote execution + # system failed at processing the execution request. + if execution_response.status.message: + raise SandboxError(execution_response.status.message) + else: + raise SandboxError("Remote server failed at executing the build request.") + + return execution_response.result + + def _create_batch(self, main_group, flags, *, collect=None): + return _SandboxRemoteBatch(self, main_group, flags, collect=collect) + + def _use_cas_based_directory(self): + # Always use CasBasedDirectory for remote execution + return True + + +# _SandboxRemoteBatch() +# +# Command batching by shell script generation. +# +class _SandboxRemoteBatch(_SandboxBatch): + + def __init__(self, sandbox, main_group, flags, *, collect=None): + super().__init__(sandbox, main_group, flags, collect=collect) + + self.script = None + self.first_command = None + self.cwd = None + self.env = None + + def execute(self): + self.script = "" + + self.main_group.execute(self) + + first = self.first_command + if first and self.sandbox.run(['sh', '-c', '-e', self.script], self.flags, cwd=first.cwd, env=first.env) != 0: + raise SandboxCommandError("Command execution failed", collect=self.collect) + + def execute_group(self, group): + group.execute_children(self) + + def execute_command(self, command): + if self.first_command is None: + # First command in batch + # Initial working directory and environment of script already matches + # the command configuration. + self.first_command = command + else: + # Change working directory for this command + if command.cwd != self.cwd: + self.script += "mkdir -p {}\n".format(command.cwd) + self.script += "cd {}\n".format(command.cwd) + + # Update environment for this command + for key in self.env.keys(): + if key not in command.env: + self.script += "unset {}\n".format(key) + for key, value in command.env.items(): + if key not in self.env or self.env[key] != value: + self.script += "export {}={}\n".format(key, shlex.quote(value)) + + # Keep track of current working directory and environment + self.cwd = command.cwd + self.env = command.env + + # Actual command execution + cmdline = ' '.join(shlex.quote(cmd) for cmd in command.command) + self.script += "(set -ex; {})".format(cmdline) + + # Error handling + label = command.label or cmdline + quoted_label = shlex.quote("'{}'".format(label)) + self.script += " || (echo Command {} failed with exitcode $? >&2 ; exit 1)\n".format(quoted_label) + + def execute_call(self, call): + raise SandboxError("SandboxRemote does not support callbacks in command batches") |