diff options
author | Jürg Billeter <j@bitron.ch> | 2020-10-20 09:34:29 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2020-12-04 16:29:06 +0100 |
commit | 425e8dcbc8934aa821ecd9c8334eee044e456a6f (patch) | |
tree | aca28d8f1542a4c3a0cc1cf94a295031642b913c | |
parent | 802f4f8a3a8f1bce7c888c8c6b7812bf654f401f (diff) | |
download | buildstream-juerg/remote-cache.tar.gz |
_sandboxremote.py: Make storage-service optional with remote cachejuerg/remote-cache
-rw-r--r-- | src/buildstream/_context.py | 2 | ||||
-rw-r--r-- | src/buildstream/_project.py | 7 | ||||
-rw-r--r-- | src/buildstream/sandbox/_sandboxremote.py | 137 |
3 files changed, 82 insertions, 64 deletions
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index d3262c0e4..ed58ccfa2 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -334,7 +334,7 @@ class Context: self.source_cache_specs = SourceCache.specs_from_config_node(defaults) # Load remote execution config - self.remote_execution_specs = SandboxRemote.specs_from_config_node(defaults) + self.remote_execution_specs = SandboxRemote.specs_from_config_node(defaults, remote_cache=bool(remote_cache)) # Load pull build trees configuration self.pull_buildtrees = cache.get_bool("pull-buildtrees") diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py index 6154c51c0..baadda051 100644 --- a/src/buildstream/_project.py +++ b/src/buildstream/_project.py @@ -887,8 +887,11 @@ class Project: self.source_cache_specs = SourceCache.specs_from_config_node(config, self.directory) # Load remote-execution configuration for this project - project_specs = SandboxRemote.specs_from_config_node(config, self.directory) - override_specs = SandboxRemote.specs_from_config_node(self._context.get_overrides(self.name), self.directory) + remote_cache = bool(self._context.remote_cache_spec) + project_specs = SandboxRemote.specs_from_config_node(config, self.directory, remote_cache=remote_cache) + override_specs = SandboxRemote.specs_from_config_node( + self._context.get_overrides(self.name), self.directory, remote_cache=remote_cache + ) if override_specs is not None: self.remote_execution_specs = override_specs diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 74e3e2330..9105bd529 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -52,11 +52,13 @@ class SandboxRemote(SandboxREAPI): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + context = self._get_context() + cascache = context.get_cascache() + config = kwargs["specs"] # This should be a RemoteExecutionSpec if config is None: return - self.storage_url = config.storage_service["url"] self.exec_url = config.exec_service["url"] exec_certs = {} @@ -71,6 +73,25 @@ class SandboxRemote(SandboxREAPI): certificate_chain=exec_certs.get("client-cert"), ) + # Storage service is optional if a remote cache is configured + if config.storage_service: + self.storage_url = config.storage_service["url"] + self.storage_instance = config.storage_service.get("instance-name", None) + self.storage_remote_spec = RemoteSpec( + self.storage_url, + push=True, + server_cert=config.storage_service.get("server-cert"), + client_key=config.storage_service.get("client-key"), + client_cert=config.storage_service.get("client-cert"), + instance_name=self.storage_instance, + ) + self.storage_remote = CASRemote(self.storage_remote_spec, cascache) + else: + self.storage_url = None + self.storage_instance = None + self.storage_remote_spec = None + self.storage_remote = cascache.get_default_remote() + action_certs = {} for key in ["client-cert", "client-key", "server-cert"]: if key in config.action_service: @@ -91,23 +112,14 @@ class SandboxRemote(SandboxREAPI): self.action_credentials = None self.exec_instance = config.exec_service.get("instance-name", None) - self.storage_instance = config.storage_service.get("instance-name", None) - - self.storage_remote_spec = RemoteSpec( - self.storage_url, - push=True, - server_cert=config.storage_service.get("server-cert"), - client_key=config.storage_service.get("client-key"), - client_cert=config.storage_service.get("client-cert"), - instance_name=self.storage_instance, - ) + self.operation_name = None def info(self, msg): self._get_context().messenger.message(Message(MessageType.INFO, msg, element_name=self._get_element_name())) @staticmethod - def specs_from_config_node(config_node, basedir=None): + def specs_from_config_node(config_node, basedir=None, *, remote_cache=False): def require_node(config, keyname): val = config.get_mapping(keyname, default=None) if val is None: @@ -128,7 +140,10 @@ class SandboxRemote(SandboxREAPI): remote_config.validate_keys(["url", *service_keys]) exec_config = require_node(remote_config, "execution-service") - storage_config = require_node(remote_config, "storage-service") + if remote_cache: + storage_config = remote_config.get_mapping("storage-service", default={}) + else: + storage_config = require_node(remote_config, "storage-service") action_config = remote_config.get_mapping("action-cache-service", default={}) tls_keys = ["client-key", "client-cert", "server-cert"] @@ -268,13 +283,13 @@ class SandboxRemote(SandboxREAPI): cascache = context.get_cascache() # Fetch the file blobs - dir_digest = vdir._get_digest() - required_blobs = cascache.required_blobs_for_directory(dir_digest) + if self.storage_url: + dir_digest = vdir._get_digest() + required_blobs = cascache.required_blobs_for_directory(dir_digest) - local_missing_blobs = cascache.missing_blobs(required_blobs) - if local_missing_blobs: - with CASRemote(self.storage_remote_spec, cascache) as casremote: - cascache.fetch_blobs(casremote, local_missing_blobs) + local_missing_blobs = cascache.missing_blobs(required_blobs) + if local_missing_blobs: + cascache.fetch_blobs(self.storage_remote, local_missing_blobs) def _execute_action(self, action, flags): stdout, stderr = self._get_output() @@ -286,46 +301,47 @@ class SandboxRemote(SandboxREAPI): action_digest = cascache.add_object(buffer=action.SerializeToString()) + casremote = self.storage_remote + # check action cache download and download if there action_result = self._check_action_cache(action_digest) if not action_result: - with CASRemote(self.storage_remote_spec, cascache) as casremote: + try: + casremote.init() + except grpc.RpcError as e: + raise SandboxError( + "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_url, e) + ) from e + + with self._get_context().messenger.timed_activity( + "Uploading input root", element_name=self._get_element_name() + ): + # Determine blobs missing on remote try: - casremote.init() + input_root_digest = action.input_root_digest + missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote)) except grpc.RpcError as e: - raise SandboxError( - "Failed to contact remote execution CAS endpoint at {}: {}".format(self.storage_url, e) - ) from e + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - with self._get_context().messenger.timed_activity( - "Uploading input root", element_name=self._get_element_name() - ): - # Determine blobs missing on remote - try: - input_root_digest = action.input_root_digest - missing_blobs = list(cascache.missing_blobs_for_directory(input_root_digest, remote=casremote)) - except grpc.RpcError as e: - raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - - # Check if any blobs are also missing locally (partial artifact) - # and pull them from the artifact cache. - try: - local_missing_blobs = cascache.missing_blobs(missing_blobs) - if local_missing_blobs: - artifactcache.fetch_missing_blobs(project, local_missing_blobs) - except (grpc.RpcError, BstError) as e: - raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e - - # Add command and action messages to blob list to push - missing_blobs.append(action.command_digest) - missing_blobs.append(action_digest) - - # Now, push the missing blobs to the remote. - try: - cascache.send_blobs(casremote, missing_blobs) - except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + # Check if any blobs are also missing locally (partial artifact) + # and pull them from the artifact cache. + try: + local_missing_blobs = cascache.missing_blobs(missing_blobs) + if local_missing_blobs: + artifactcache.fetch_missing_blobs(project, local_missing_blobs) + except (grpc.RpcError, BstError) as e: + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e + + # Add command and action messages to blob list to push + missing_blobs.append(action.command_digest) + missing_blobs.append(action_digest) + + # Now, push the missing blobs to the remote. + try: + cascache.send_blobs(casremote, missing_blobs) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e # Next, try to create a communication channel to the BuildGrid server. url = urlparse(self.exec_url) @@ -350,17 +366,16 @@ class SandboxRemote(SandboxREAPI): action_result = self._extract_action_result(operation) # Fetch outputs - with CASRemote(self.storage_remote_spec, cascache) as casremote: - for output_directory in action_result.output_directories: - tree_digest = output_directory.tree_digest - if tree_digest is None or not tree_digest.hash: - raise SandboxError("Output directory structure had no digest attached.") + for output_directory in action_result.output_directories: + tree_digest = output_directory.tree_digest + if tree_digest is None or not tree_digest.hash: + raise SandboxError("Output directory structure had no digest attached.") - # Now do a pull to ensure we have the full directory structure. - cascache.pull_tree(casremote, tree_digest) + # Now do a pull to ensure we have the full directory structure. + cascache.pull_tree(casremote, tree_digest) - # Fetch stdout and stderr blobs - cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest]) + # Fetch stdout and stderr blobs + cascache.fetch_blobs(casremote, [action_result.stdout_digest, action_result.stderr_digest]) # Forward remote stdout and stderr if stdout: |