diff options
author | Qinusty <jrsmith9822@gmail.com> | 2018-09-07 16:48:02 +0000 |
---|---|---|
committer | Qinusty <jrsmith9822@gmail.com> | 2018-09-07 16:48:02 +0000 |
commit | f52169b2994e671754607275711421df5c3af830 (patch) | |
tree | a0213488a7ce2c65e5b4fb47075947e23177dfda | |
parent | 7b32e1ec903d658dfa75c754b0dd45a3e9331638 (diff) | |
parent | 9ef53163b95c1b21178016737626f99956b256d8 (diff) | |
download | buildstream-f52169b2994e671754607275711421df5c3af830.tar.gz |
Merge branch 'jmac/remote_execution_client' into 'master'
Remote execution client
See merge request BuildStream/buildstream!626
25 files changed, 1249 insertions, 100 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 9a9f7024f..ce2b874da 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -19,6 +19,7 @@ import hashlib import itertools +import io import multiprocessing import os import signal @@ -76,6 +77,7 @@ class CASCache(ArtifactCache): ################################################ # Implementation of abstract methods # ################################################ + def contains(self, element, key): refpath = self._refpath(self.get_artifact_fullname(element, key)) @@ -153,6 +155,7 @@ class CASCache(ArtifactCache): q = multiprocessing.Queue() for remote_spec in remote_specs: # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) try: @@ -259,6 +262,25 @@ class CASCache(ArtifactCache): return False + def pull_tree(self, project, digest): + """ Pull a single Tree rather than an artifact. + Does not update local refs. """ + + for remote in self._remotes[project]: + try: + remote.init() + + digest = self._fetch_tree(remote, digest) + + # no need to pull from additional remotes + return digest + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise + + return None + def link_key(self, element, oldkey, newkey): oldref = self.get_artifact_fullname(element, oldkey) newref = self.get_artifact_fullname(element, newkey) @@ -267,8 +289,46 @@ class CASCache(ArtifactCache): self.set_ref(newref, tree) + def _push_refs_to_remote(self, refs, remote): + skipped_remote = True + try: + for ref in refs: + tree = self.resolve_ref(ref) + + # Check whether ref is already on the server in which case + # there is no need to push the artifact + try: + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + response = remote.ref_storage.GetReference(request) + + if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: + # ref is already on the server with the same tree + continue + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + # Intentionally re-raise RpcError for outer except block. + raise + + self._send_directory(remote, tree) + + request = buildstream_pb2.UpdateReferenceRequest() + request.keys.append(ref) + request.digest.hash = tree.hash + request.digest.size_bytes = tree.size_bytes + remote.ref_storage.UpdateReference(request) + + skipped_remote = False + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + + return not skipped_remote + def push(self, element, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] + + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] project = element._get_project() @@ -278,95 +338,77 @@ class CASCache(ArtifactCache): for remote in push_remotes: remote.init() - skipped_remote = True - element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url)) - try: - for ref in refs: - tree = self.resolve_ref(ref) - - # Check whether ref is already on the server in which case - # there is no need to push the artifact - try: - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: - # ref is already on the server with the same tree - continue - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - # Intentionally re-raise RpcError for outer except block. - raise - - missing_blobs = {} - required_blobs = self._required_blobs(tree) - - # Limit size of FindMissingBlobs request - for required_blobs_group in _grouper(required_blobs, 512): - request = remote_execution_pb2.FindMissingBlobsRequest() - - for required_digest in required_blobs_group: - d = request.blob_digests.add() - d.hash = required_digest.hash - d.size_bytes = required_digest.size_bytes - - response = remote.cas.FindMissingBlobs(request) - for digest in response.missing_blob_digests: - d = remote_execution_pb2.Digest() - d.hash = digest.hash - d.size_bytes = digest.size_bytes - missing_blobs[d.hash] = d - - # Upload any blobs missing on the server - skipped_remote = False - for digest in missing_blobs.values(): - uuid_ = uuid.uuid4() - resource_name = '/'.join(['uploads', str(uuid_), 'blobs', - digest.hash, str(digest.size_bytes)]) - - def request_stream(resname): - with open(self.objpath(digest), 'rb') as f: - assert os.fstat(f.fileno()).st_size == digest.size_bytes - offset = 0 - finished = False - remaining = digest.size_bytes - while not finished: - chunk_size = min(remaining, 64 * 1024) - remaining -= chunk_size - - request = bytestream_pb2.WriteRequest() - request.write_offset = offset - # max. 64 kB chunks - request.data = f.read(chunk_size) - request.resource_name = resname - request.finish_write = remaining <= 0 - yield request - offset += chunk_size - finished = request.finish_write - response = remote.bytestream.Write(request_stream(resource_name)) - - request = buildstream_pb2.UpdateReferenceRequest() - request.keys.append(ref) - request.digest.hash = tree.hash - request.digest.size_bytes = tree.size_bytes - remote.ref_storage.UpdateReference(request) - - pushed = True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url)) - if skipped_remote: + if self._push_refs_to_remote(refs, remote): + pushed = True + else: self.context.message(Message( None, MessageType.SKIPPED, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) + + return pushed + + def push_directory(self, project, directory): + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + if directory.ref is None: + return None + + for remote in push_remotes: + remote.init() + + self._send_directory(remote, directory.ref) + + return directory.ref + + def push_message(self, project, message): + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + message_buffer = message.SerializeToString() + message_sha = hashlib.sha256(message_buffer) + message_digest = remote_execution_pb2.Digest() + message_digest.hash = message_sha.hexdigest() + message_digest.size_bytes = len(message_buffer) + + for remote in push_remotes: + remote.init() + + with io.BytesIO(message_buffer) as b: + self._send_blob(remote, message_digest, b) + + return message_digest + + def _verify_digest_on_remote(self, remote, digest): + # Check whether ref is already on the server in which case + # there is no need to push the artifact + request = remote_execution_pb2.FindMissingBlobsRequest() + request.blob_digests.extend([digest]) + + response = remote.cas.FindMissingBlobs(request) + if digest in response.missing_blob_digests: + return False + + return True + + def verify_digest_pushed(self, project, digest): + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + + if self._verify_digest_on_remote(remote, digest): + pushed = True + return pushed ################################################ @@ -599,6 +641,7 @@ class CASCache(ArtifactCache): ################################################ # Local Private Methods # ################################################ + def _checkout(self, dest, tree): os.makedirs(dest, exist_ok=True) @@ -761,16 +804,16 @@ class CASCache(ArtifactCache): # q.put(str(e)) - def _required_blobs(self, tree): + def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() - d.hash = tree.hash - d.size_bytes = tree.size_bytes + d.hash = directory_digest.hash + d.size_bytes = directory_digest.size_bytes yield d directory = remote_execution_pb2.Directory() - with open(self.objpath(tree), 'rb') as f: + with open(self.objpath(directory_digest), 'rb') as f: directory.ParseFromString(f.read()) for filenode in directory.files: @@ -782,16 +825,16 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: yield from self._required_blobs(dirnode.digest) - def _fetch_blob(self, remote, digest, out): + def _fetch_blob(self, remote, digest, stream): resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)]) request = bytestream_pb2.ReadRequest() request.resource_name = resource_name request.read_offset = 0 for response in remote.bytestream.Read(request): - out.write(response.data) + stream.write(response.data) + stream.flush() - out.flush() - assert digest.size_bytes == os.fstat(out.fileno()).st_size + assert digest.size_bytes == os.fstat(stream.fileno()).st_size def _fetch_directory(self, remote, tree): objpath = self.objpath(tree) @@ -827,6 +870,92 @@ class CASCache(ArtifactCache): digest = self.add_object(path=out.name) assert digest.hash == tree.hash + def _fetch_tree(self, remote, digest): + # download but do not store the Tree object + with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out: + self._fetch_blob(remote, digest, out) + + tree = remote_execution_pb2.Tree() + + with open(out.name, 'rb') as f: + tree.ParseFromString(f.read()) + + tree.children.extend([tree.root]) + for directory in tree.children: + for filenode in directory.files: + fileobjpath = self.objpath(filenode.digest) + if os.path.exists(fileobjpath): + # already in local cache + continue + + with tempfile.NamedTemporaryFile(dir=self.tmpdir) as f: + self._fetch_blob(remote, filenode.digest, f) + + added_digest = self.add_object(path=f.name) + assert added_digest.hash == filenode.digest.hash + + # place directory blob only in final location when we've downloaded + # all referenced blobs to avoid dangling references in the repository + dirbuffer = directory.SerializeToString() + dirdigest = self.add_object(buffer=dirbuffer) + assert dirdigest.size_bytes == len(dirbuffer) + + return dirdigest + + def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()): + resource_name = '/'.join(['uploads', str(u_uid), 'blobs', + digest.hash, str(digest.size_bytes)]) + + def request_stream(resname, instream): + offset = 0 + finished = False + remaining = digest.size_bytes + while not finished: + chunk_size = min(remaining, 64 * 1024) + remaining -= chunk_size + + request = bytestream_pb2.WriteRequest() + request.write_offset = offset + # max. 64 kB chunks + request.data = instream.read(chunk_size) + request.resource_name = resname + request.finish_write = remaining <= 0 + + yield request + + offset += chunk_size + finished = request.finish_write + + response = remote.bytestream.Write(request_stream(resource_name, stream)) + + assert response.committed_size == digest.size_bytes + + def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + required_blobs = self._required_blobs(digest) + + missing_blobs = dict() + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest() + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = remote.cas.FindMissingBlobs(request) + for missing_digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = missing_digest.hash + d.size_bytes = missing_digest.size_bytes + missing_blobs[d.hash] = d + + # Upload any blobs missing on the server + for blob_digest in missing_blobs.values(): + with open(self.objpath(blob_digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == blob_digest.size_bytes + self._send_blob(remote, blob_digest, f, u_uid=u_uid) + # Represents a single remote CAS cache. # diff --git a/buildstream/_project.py b/buildstream/_project.py index 2f8ae69fb..f0ca3d71b 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -128,6 +128,7 @@ class Project(): self._shell_host_files = [] # A list of HostMount objects self.artifact_cache_specs = None + self.remote_execution_url = None self._sandbox = None self._splits = None @@ -471,7 +472,7 @@ class Project(): 'aliases', 'name', 'artifacts', 'options', 'fail-on-overlap', 'shell', 'fatal-warnings', - 'ref-storage', 'sandbox', 'mirrors' + 'ref-storage', 'sandbox', 'mirrors', 'remote-execution' ]) # @@ -482,6 +483,11 @@ class Project(): # Load artifacts pull/push configuration for this project self.artifact_cache_specs = ArtifactCache.specs_from_config_node(config, self.directory) + # Load remote-execution configuration for this project + remote_execution = _yaml.node_get(config, Mapping, 'remote-execution') + _yaml.node_validate(remote_execution, ['url']) + self.remote_execution_url = _yaml.node_get(remote_execution, str, 'url') + # Load sandbox environment variables self.base_environment = _yaml.node_get(config, Mapping, 'environment') self.base_env_nocache = _yaml.node_get(config, list, 'environment-nocache') diff --git a/buildstream/buildelement.py b/buildstream/buildelement.py index 180bb86ab..5447c13be 100644 --- a/buildstream/buildelement.py +++ b/buildstream/buildelement.py @@ -155,6 +155,9 @@ class BuildElement(Element): command_dir = build_root sandbox.set_work_directory(command_dir) + # Tell sandbox which directory is preserved in the finished artifact + sandbox.set_output_directory(install_root) + # Setup environment sandbox.set_environment(self.get_environment()) diff --git a/buildstream/data/projectconfig.yaml b/buildstream/data/projectconfig.yaml index c1ad2d147..1da67a530 100644 --- a/buildstream/data/projectconfig.yaml +++ b/buildstream/data/projectconfig.yaml @@ -204,3 +204,6 @@ shell: # Command to run when `bst shell` does not provide a command # command: [ 'sh', '-i' ] + +remote-execution: + url: ""
\ No newline at end of file diff --git a/buildstream/element.py b/buildstream/element.py index ae8e10144..dd7ccfed3 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -95,6 +95,7 @@ from . import _site from ._platform import Platform from .plugin import CoreWarnings from .sandbox._config import SandboxConfig +from .sandbox._sandboxremote import SandboxRemote from .storage.directory import Directory from .storage._filebaseddirectory import FileBasedDirectory @@ -250,6 +251,12 @@ class Element(Plugin): # Extract Sandbox config self.__sandbox_config = self.__extract_sandbox_config(meta) + # Extract remote execution URL + if not self.__is_junction: + self.__remote_execution_url = project.remote_execution_url + else: + self.__remote_execution_url = None + def __lt__(self, other): return self.name < other.name @@ -1570,6 +1577,8 @@ class Element(Plugin): finally: if collect is not None: try: + # Sandbox will probably have replaced its virtual directory, so get it again + sandbox_vroot = sandbox.get_virtual_directory() collectvdir = sandbox_vroot.descend(collect.lstrip(os.sep).split(os.sep)) except VirtualDirectoryError: # No collect directory existed @@ -2146,7 +2155,32 @@ class Element(Plugin): project = self._get_project() platform = Platform.get_platform() - if directory is not None and os.path.exists(directory): + if self.__remote_execution_url and self.BST_VIRTUAL_DIRECTORY: + if not self.__artifacts.has_push_remotes(element=self): + # Give an early warning if remote execution will not work + raise ElementError("Artifact {} is configured to use remote execution but has no push remotes. " + .format(self.name) + + "The remote artifact server(s) may not be correctly configured or contactable.") + + self.info("Using a remote sandbox for artifact {}".format(self.name)) + + sandbox = SandboxRemote(context, project, + directory, + stdout=stdout, + stderr=stderr, + config=config, + server_url=self.__remote_execution_url, + allow_real_directory=False) + yield sandbox + + elif directory is not None and os.path.exists(directory): + if self.__remote_execution_url: + self.warn("Artifact {} is configured to use remote execution but element plugin does not support it." + .format(self.name), detail="Element plugin '{kind}' does not support virtual directories." + .format(kind=self.get_kind()), warning_token="remote-failure") + + self.info("Falling back to local sandbox for artifact {}".format(self.name)) + sandbox = platform.create_sandbox(context, project, directory, stdout=stdout, diff --git a/buildstream/plugins/elements/autotools.py b/buildstream/plugins/elements/autotools.py index 14d04d9a3..cf5e85661 100644 --- a/buildstream/plugins/elements/autotools.py +++ b/buildstream/plugins/elements/autotools.py @@ -57,7 +57,8 @@ from buildstream import BuildElement # Element implementation for the 'autotools' kind. class AutotoolsElement(BuildElement): - pass + # Supports virtual directories (required for remote execution) + BST_VIRTUAL_DIRECTORY = True # Plugin entry point diff --git a/buildstream/plugins/elements/cmake.py b/buildstream/plugins/elements/cmake.py index 8126a80ac..2cb2601ae 100644 --- a/buildstream/plugins/elements/cmake.py +++ b/buildstream/plugins/elements/cmake.py @@ -56,7 +56,8 @@ from buildstream import BuildElement # Element implementation for the 'cmake' kind. class CMakeElement(BuildElement): - pass + # Supports virtual directories (required for remote execution) + BST_VIRTUAL_DIRECTORY = True # Plugin entry point diff --git a/buildstream/plugins/elements/make.py b/buildstream/plugins/elements/make.py index 1f37cb412..6c500f3f9 100644 --- a/buildstream/plugins/elements/make.py +++ b/buildstream/plugins/elements/make.py @@ -38,7 +38,8 @@ from buildstream import BuildElement # Element implementation for the 'make' kind. class MakeElement(BuildElement): - pass + # Supports virtual directories (required for remote execution) + BST_VIRTUAL_DIRECTORY = True # Plugin entry point diff --git a/buildstream/plugins/elements/meson.py b/buildstream/plugins/elements/meson.py index 228e90ad1..9e0edf19e 100644 --- a/buildstream/plugins/elements/meson.py +++ b/buildstream/plugins/elements/meson.py @@ -53,7 +53,8 @@ from buildstream import BuildElement # Element implementation for the 'meson' kind. class MesonElement(BuildElement): - pass + # Supports virtual directories (required for remote execution) + BST_VIRTUAL_DIRECTORY = True # Plugin entry point diff --git a/buildstream/plugins/elements/qmake.py b/buildstream/plugins/elements/qmake.py index 7896692a6..9f5bc4018 100644 --- a/buildstream/plugins/elements/qmake.py +++ b/buildstream/plugins/elements/qmake.py @@ -33,7 +33,8 @@ from buildstream import BuildElement # Element implementation for the 'qmake' kind. class QMakeElement(BuildElement): - pass + # Supports virtual directories (required for remote execution) + BST_VIRTUAL_DIRECTORY = True # Plugin entry point diff --git a/buildstream/sandbox/__init__.py b/buildstream/sandbox/__init__.py index 53e170fbd..2c76e9e8e 100644 --- a/buildstream/sandbox/__init__.py +++ b/buildstream/sandbox/__init__.py @@ -20,3 +20,4 @@ from .sandbox import Sandbox, SandboxFlags from ._sandboxchroot import SandboxChroot from ._sandboxbwrap import SandboxBwrap +from ._sandboxremote import SandboxRemote diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py new file mode 100644 index 000000000..296b20351 --- /dev/null +++ b/buildstream/sandbox/_sandboxremote.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Bloomberg LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Jim MacArthur <jim.macarthur@codethink.co.uk> + +import os +from urllib.parse import urlparse + +import grpc + +from . import Sandbox +from ..storage._filebaseddirectory import FileBasedDirectory +from ..storage._casbaseddirectory import CasBasedDirectory +from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._artifactcache.cascache import CASCache + + +class SandboxError(Exception): + pass + + +# SandboxRemote() +# +# This isn't really a sandbox, it's a stub which sends all the sources and build +# commands to a remote server and retrieves the results from it. +# +class SandboxRemote(Sandbox): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.cascache = None + + url = urlparse(kwargs['server_url']) + if not url.scheme or not url.hostname or not url.port: + raise SandboxError("Configured remote URL '{}' does not match the expected layout. " + .format(kwargs['server_url']) + + "It should be of the form <protocol>://<domain name>:<port>.") + elif url.scheme != 'http': + raise SandboxError("Configured remote '{}' uses an unsupported protocol. " + "Only plain HTTP is currenlty supported (no HTTPS).") + + self.server_url = '{}:{}'.format(url.hostname, url.port) + + def _get_cascache(self): + if self.cascache is None: + self.cascache = CASCache(self._get_context()) + self.cascache.setup_remotes(use_config=True) + return self.cascache + + def run_remote_command(self, command, input_root_digest, working_directory, environment): + # Sends an execution request to the remote execution server. + # + # This function blocks until it gets a response from the server. + # + environment_variables = [remote_execution_pb2.Command. + EnvironmentVariable(name=k, value=v) + for (k, v) in environment.items()] + + # Create and send the Command object. + remote_command = remote_execution_pb2.Command(arguments=command, + working_directory=working_directory, + environment_variables=environment_variables, + output_files=[], + output_directories=[self._output_directory], + platform=None) + + cascache = self._get_cascache() + # Upload the Command message to the remote CAS server + command_digest = cascache.push_message(self._get_project(), remote_command) + if not command_digest or not cascache.verify_digest_pushed(self._get_project(), command_digest): + # Command push failed + return None + + # Create and send the action. + action = remote_execution_pb2.Action(command_digest=command_digest, + input_root_digest=input_root_digest, + timeout=None, + do_not_cache=False) + + # Upload the Action message to the remote CAS server + action_digest = cascache.push_message(self._get_project(), action) + if not action_digest or not cascache.verify_digest_pushed(self._get_project(), action_digest): + # Action push failed + return None + + # Next, try to create a communication channel to the BuildGrid server. + channel = grpc.insecure_channel(self.server_url) + stub = remote_execution_pb2_grpc.ExecutionStub(channel) + request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest, + skip_cache_lookup=False) + try: + operation_iterator = stub.Execute(request) + except grpc.RpcError: + return None + + operation = None + with self._get_context().timed_activity("Waiting for the remote build to complete"): + # It is advantageous to check operation_iterator.code() is grpc.StatusCode.OK here, + # which will check the server is actually contactable. However, calling it when the + # server is available seems to cause .code() to hang forever. + for operation in operation_iterator: + if operation.done: + break + + return operation + + def process_job_output(self, output_directories, output_files): + # Reads the remote execution server response to an execution request. + # + # output_directories is an array of OutputDirectory objects. + # output_files is an array of OutputFile objects. + # + # We only specify one output_directory, so it's an error + # for there to be any output files or more than one directory at the moment. + # + if output_files: + raise SandboxError("Output files were returned when we didn't request any.") + elif not output_directories: + error_text = "No output directory was returned from the build server." + raise SandboxError(error_text) + elif len(output_directories) > 1: + error_text = "More than one output directory was returned from the build server: {}." + raise SandboxError(error_text.format(output_directories)) + + tree_digest = output_directories[0].tree_digest + if tree_digest is None or not tree_digest.hash: + raise SandboxError("Output directory structure had no digest attached.") + + cascache = self._get_cascache() + # Now do a pull to ensure we have the necessary parts. + dir_digest = cascache.pull_tree(self._get_project(), tree_digest) + if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes: + raise SandboxError("Output directory structure pulling from remote failed.") + + path_components = os.path.split(self._output_directory) + + # Now what we have is a digest for the output. Once we return, the calling process will + # attempt to descend into our directory and find that directory, so we need to overwrite + # that. + + if not path_components: + # The artifact wants the whole directory; we could just return the returned hash in its + # place, but we don't have a means to do that yet. + raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.") + + # At the moment, we will get the whole directory back in the first directory argument and we need + # to replace the sandbox's virtual directory with that. Creating a new virtual directory object + # from another hash will be interesting, though... + + new_dir = CasBasedDirectory(self._get_context(), ref=dir_digest) + self._set_virtual_directory(new_dir) + + def run(self, command, flags, *, cwd=None, env=None): + # Upload sources + upload_vdir = self.get_virtual_directory() + + if isinstance(upload_vdir, FileBasedDirectory): + # Make a new temporary directory to put source in + upload_vdir = CasBasedDirectory(self._get_context(), ref=None) + upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory()) + + upload_vdir.recalculate_hash() + + cascache = self._get_cascache() + # Now, push that key (without necessarily needing a ref) to the remote. + vdir_digest = cascache.push_directory(self._get_project(), upload_vdir) + if not vdir_digest or not cascache.verify_digest_pushed(self._get_project(), vdir_digest): + raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.") + + # Set up environment and working directory + if cwd is None: + cwd = self._get_work_directory() + + if cwd is None: + cwd = '/' + + if env is None: + env = self._get_environment() + + # We want command args as a list of strings + if isinstance(command, str): + command = [command] + + # Now transmit the command to execute + operation = self.run_remote_command(command, upload_vdir.ref, cwd, env) + + if operation is None: + # Failure of remote execution, usually due to an error in BuildStream + # NB This error could be raised in __run_remote_command + raise SandboxError("No response returned from server") + + assert not operation.HasField('error') and operation.HasField('response') + + execution_response = remote_execution_pb2.ExecuteResponse() + # The response is expected to be an ExecutionResponse message + assert operation.response.Is(execution_response.DESCRIPTOR) + + operation.response.Unpack(execution_response) + + if execution_response.status.code != 0: + # A normal error during the build: the remote execution system + # has worked correctly but the command failed. + # execution_response.error also contains 'message' (str) and + # 'details' (iterator of Any) which we ignore at the moment. + return execution_response.status.code + + action_result = execution_response.result + + self.process_job_output(action_result.output_directories, action_result.output_files) + + return 0 diff --git a/buildstream/sandbox/sandbox.py b/buildstream/sandbox/sandbox.py index 87a2fb9c9..9d34f0195 100644 --- a/buildstream/sandbox/sandbox.py +++ b/buildstream/sandbox/sandbox.py @@ -99,9 +99,11 @@ class Sandbox(): self.__stdout = kwargs['stdout'] self.__stderr = kwargs['stderr'] - # Setup the directories. Root should be available to subclasses, hence - # being single-underscore. The others are private to this class. + # Setup the directories. Root and output_directory should be + # available to subclasses, hence being single-underscore. The + # others are private to this class. self._root = os.path.join(directory, 'root') + self._output_directory = None self.__directory = directory self.__scratch = os.path.join(self.__directory, 'scratch') for directory_ in [self._root, self.__scratch]: @@ -144,11 +146,17 @@ class Sandbox(): self._vdir = FileBasedDirectory(self._root) return self._vdir + def _set_virtual_directory(self, virtual_directory): + """ Sets virtual directory. Useful after remote execution + has rewritten the working directory. + """ + self._vdir = virtual_directory + def set_environment(self, environment): """Sets the environment variables for the sandbox Args: - directory (dict): The environment variables to use in the sandbox + environment (dict): The environment variables to use in the sandbox """ self.__env = environment @@ -160,6 +168,15 @@ class Sandbox(): """ self.__cwd = directory + def set_output_directory(self, directory): + """Sets the output directory - the directory which is preserved + as an artifact after assembly. + + Args: + directory (str): An absolute path within the sandbox + """ + self._output_directory = directory + def mark_directory(self, directory, *, artifact=False): """Marks a sandbox directory and ensures it will exist diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py index 5ca100793..d580635c1 100644 --- a/buildstream/storage/_casbaseddirectory.py +++ b/buildstream/storage/_casbaseddirectory.py @@ -543,6 +543,15 @@ class CasBasedDirectory(Directory): filelist.append(k) return filelist + def recalculate_hash(self): + """ Recalcuates the hash for this directory and store the results in + the cache. If this directory has a parent, tell it to + recalculate (since changing this directory changes an entry in + the parent). Hashes for subdirectories also get recalculated. + """ + self._recalculate_recursing_up() + self._recalculate_recursing_down() + def _get_identifier(self): path = "" if self.parent: diff --git a/doc/source/format_project.rst b/doc/source/format_project.rst index b43e67005..31984145b 100644 --- a/doc/source/format_project.rst +++ b/doc/source/format_project.rst @@ -204,6 +204,24 @@ with an artifact share. You can also specify a list of caches here; earlier entries in the list will have higher priority than later ones. +Remote execution +~~~~~~~~~~~~~~~~ +BuildStream supports remote execution using the Google Remote Execution API +(REAPI). A description of how remote execution works is beyond the scope +of this document, but you can specify a remote server complying with the REAPI +using the `remote-execution` option: + +.. code:: yaml + + remote-execution: + + # A url defining a remote execution server + url: http://buildserver.example.com:50051 + +The url should contain a hostname and port separated by ':'. Only plain HTTP is +currently suported (no HTTPS). + +The Remote Execution API can be found via https://github.com/bazelbuild/remote-apis. .. _project_essentials_mirrors: diff --git a/tests/artifactcache/project/elements/compose-all.bst b/tests/artifactcache/project/elements/compose-all.bst new file mode 100644 index 000000000..ba47081b3 --- /dev/null +++ b/tests/artifactcache/project/elements/compose-all.bst @@ -0,0 +1,12 @@ +kind: compose + +depends: +- filename: import-bin.bst + type: build +- filename: import-dev.bst + type: build + +config: + # Dont try running the sandbox, we dont have a + # runtime to run anything in this context. + integrate: False diff --git a/tests/artifactcache/project/elements/import-bin.bst b/tests/artifactcache/project/elements/import-bin.bst new file mode 100644 index 000000000..a847c0c23 --- /dev/null +++ b/tests/artifactcache/project/elements/import-bin.bst @@ -0,0 +1,4 @@ +kind: import +sources: +- kind: local + path: files/bin-files diff --git a/tests/artifactcache/project/elements/import-dev.bst b/tests/artifactcache/project/elements/import-dev.bst new file mode 100644 index 000000000..152a54667 --- /dev/null +++ b/tests/artifactcache/project/elements/import-dev.bst @@ -0,0 +1,4 @@ +kind: import +sources: +- kind: local + path: files/dev-files diff --git a/tests/artifactcache/project/elements/target.bst b/tests/artifactcache/project/elements/target.bst new file mode 100644 index 000000000..ba489f1e8 --- /dev/null +++ b/tests/artifactcache/project/elements/target.bst @@ -0,0 +1,9 @@ +kind: stack +description: | + + Main stack target for the bst build test + +depends: +- import-bin.bst +- import-dev.bst +- compose-all.bst diff --git a/tests/artifactcache/project/files/bin-files/usr/bin/hello b/tests/artifactcache/project/files/bin-files/usr/bin/hello new file mode 100755 index 000000000..f534a4083 --- /dev/null +++ b/tests/artifactcache/project/files/bin-files/usr/bin/hello @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "Hello !" diff --git a/tests/artifactcache/project/files/dev-files/usr/include/pony.h b/tests/artifactcache/project/files/dev-files/usr/include/pony.h new file mode 100644 index 000000000..40bd0c2e7 --- /dev/null +++ b/tests/artifactcache/project/files/dev-files/usr/include/pony.h @@ -0,0 +1,12 @@ +#ifndef __PONY_H__ +#define __PONY_H__ + +#define PONY_BEGIN "Once upon a time, there was a pony." +#define PONY_END "And they lived happily ever after, the end." + +#define MAKE_PONY(story) \ + PONY_BEGIN \ + story \ + PONY_END + +#endif /* __PONY_H__ */ diff --git a/tests/artifactcache/project/project.conf b/tests/artifactcache/project/project.conf new file mode 100644 index 000000000..854e38693 --- /dev/null +++ b/tests/artifactcache/project/project.conf @@ -0,0 +1,4 @@ +# Project config for frontend build test +name: test + +element-path: elements diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py new file mode 100644 index 000000000..6336e7ab1 --- /dev/null +++ b/tests/artifactcache/pull.py @@ -0,0 +1,320 @@ +import hashlib +import multiprocessing +import os +import signal + +import pytest + +from buildstream import _yaml, _signals, utils +from buildstream._artifactcache.cascache import CASCache +from buildstream._context import Context +from buildstream._project import Project +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 + +from tests.testutils import cli, create_artifact_share + + +# Project directory +DATA_DIR = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "project", +) + + +# Handle messages from the pipeline +def message_handler(message, context): + pass + + +def tree_maker(cas, tree, directory): + if tree.root.ByteSize() == 0: + tree.root.CopyFrom(directory) + + for directory_node in directory.directories: + child_directory = tree.children.add() + + with open(cas.objpath(directory_node.digest), 'rb') as f: + child_directory.ParseFromString(f.read()) + + tree_maker(cas, tree, child_directory) + + +@pytest.mark.datafiles(DATA_DIR) +def test_pull(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + # Ensure CLI calls will use it + cli.configure(user_config) + + # First build the project with the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' + # Assert that we shared/pushed the cached artifact + element_key = cli.get_element_key(project_dir, 'target.bst') + assert share.has_artifact('test', 'target.bst', element_key) + + # Delete the artifact locally + cli.remove_artifact_from_cache(project_dir, 'target.bst') + + # Assert that we are not cached locally anymore + assert cli.get_element_state(project_dir, 'target.bst') != 'cached' + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts') + context.set_message_handler(message_handler) + + # Load the project and CAS cache + project = Project(project_dir, context) + project.ensure_fully_loaded() + cas = CASCache(context) + + # Assert that the element's artifact is **not** cached + element = project.load_elements(['target.bst'], cas)[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert not cas.contains(element, element_key) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_pull, + args=(user_config_file, project_dir, artifact_dir, + 'target.bst', element_key, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert not error + assert cas.contains(element, element_key) + + +def _test_pull(user_config_file, project_dir, artifact_dir, + element_name, element_key, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Load the target element + element = project.load_elements([element_name], cas)[0] + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(element=element): + # Push the element's artifact + if not cas.pull(element, element_key): + queue.put("Pull operation failed") + else: + queue.put(None) + else: + queue.put("No remote configured for element {}".format(element_name)) + + +@pytest.mark.datafiles(DATA_DIR) +def test_pull_tree(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + # Ensure CLI calls will use it + cli.configure(user_config) + + # First build the project with the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' + # Assert that we shared/pushed the cached artifact + element_key = cli.get_element_key(project_dir, 'target.bst') + assert share.has_artifact('test', 'target.bst', element_key) + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts') + context.set_message_handler(message_handler) + + # Load the project and CAS cache + project = Project(project_dir, context) + project.ensure_fully_loaded() + cas = CASCache(context) + + # Assert that the element's artifact is cached + element = project.load_elements(['target.bst'], cas)[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert cas.contains(element, element_key) + + # Retrieve the Directory object from the cached artifact + artifact_ref = cas.get_artifact_fullname(element, element_key) + artifact_digest = cas.resolve_ref(artifact_ref) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_push_tree, + args=(user_config_file, project_dir, artifact_dir, + artifact_digest, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + tree_hash, tree_size = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert tree_hash and tree_size + + # Now delete the artifact locally + cli.remove_artifact_from_cache(project_dir, 'target.bst') + + # Assert that we are not cached locally anymore + assert cli.get_element_state(project_dir, 'target.bst') != 'cached' + + tree_digest = remote_execution_pb2.Digest(hash=tree_hash, + size_bytes=tree_size) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + process = multiprocessing.Process(target=_test_pull_tree, + args=(user_config_file, project_dir, artifact_dir, + tree_digest, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + directory_hash, directory_size = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert directory_hash and directory_size + + directory_digest = remote_execution_pb2.Digest(hash=directory_hash, + size_bytes=directory_size) + + # Ensure the entire Tree stucture has been pulled + assert os.path.exists(cas.objpath(directory_digest)) + + +def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(): + directory = remote_execution_pb2.Directory() + + with open(cas.objpath(artifact_digest), 'rb') as f: + directory.ParseFromString(f.read()) + + # Build the Tree object while we are still cached + tree = remote_execution_pb2.Tree() + tree_maker(cas, tree, directory) + + # Push the Tree as a regular message + tree_digest = cas.push_message(project, tree) + + queue.put((tree_digest.hash, tree_digest.size_bytes)) + else: + queue.put("No remote configured") + + +def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(): + # Pull the artifact using the Tree object + directory_digest = cas.pull_tree(project, artifact_digest) + queue.put((directory_digest.hash, directory_digest.size_bytes)) + else: + queue.put("No remote configured") diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py new file mode 100644 index 000000000..bdeb86862 --- /dev/null +++ b/tests/artifactcache/push.py @@ -0,0 +1,312 @@ +import multiprocessing +import os +import signal + +import pytest + +from pluginbase import PluginBase +from buildstream import _yaml, _signals, utils +from buildstream._artifactcache.cascache import CASCache +from buildstream._context import Context +from buildstream._project import Project +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 +from buildstream.storage._casbaseddirectory import CasBasedDirectory + +from tests.testutils import cli, create_artifact_share + + +# Project directory +DATA_DIR = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "project", +) + + +# Handle messages from the pipeline +def message_handler(message, context): + pass + + +@pytest.mark.datafiles(DATA_DIR) +def test_push(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # First build the project without the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Assert that the element's artifact is cached + element = project.load_elements(['target.bst'], cas)[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert cas.contains(element, element_key) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_push, + args=(user_config_file, project_dir, artifact_dir, + 'target.bst', element_key, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + error = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert not error + assert share.has_artifact('test', 'target.bst', element_key) + + +def _test_push(user_config_file, project_dir, artifact_dir, + element_name, element_key, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Load the target element + element = project.load_elements([element_name], cas)[0] + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(element=element): + # Push the element's artifact + if not cas.push(element, [element_key]): + queue.put("Push operation failed") + else: + queue.put(None) + else: + queue.put("No remote configured for element {}".format(element_name)) + + +@pytest.mark.datafiles(DATA_DIR) +def test_push_directory(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # First build the project without the artifact cache configured + result = cli.run(project=project_dir, args=['build', 'target.bst']) + result.assert_success() + + # Assert that we are now cached locally + assert cli.get_element_state(project_dir, 'target.bst') == 'cached' + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts') + context.set_message_handler(message_handler) + + # Load the project and CAS cache + project = Project(project_dir, context) + project.ensure_fully_loaded() + cas = CASCache(context) + + # Assert that the element's artifact is cached + element = project.load_elements(['target.bst'], cas)[0] + element_key = cli.get_element_key(project_dir, 'target.bst') + assert cas.contains(element, element_key) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + assert cas.has_push_remotes(element=element) + + # Recreate the CasBasedDirectory object from the cached artifact + artifact_ref = cas.get_artifact_fullname(element, element_key) + artifact_digest = cas.resolve_ref(artifact_ref) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_push_directory, + args=(user_config_file, project_dir, artifact_dir, + artifact_digest, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + directory_hash = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert directory_hash + assert artifact_digest.hash == directory_hash + assert share.has_object(artifact_digest) + + +def _test_push_directory(user_config_file, project_dir, artifact_dir, artifact_digest, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(): + # Create a CasBasedDirectory from local CAS cache content + directory = CasBasedDirectory(context, ref=artifact_digest) + + # Push the CasBasedDirectory object + directory_digest = cas.push_directory(project, directory) + + queue.put(directory_digest.hash) + else: + queue.put("No remote configured") + + +@pytest.mark.datafiles(DATA_DIR) +def test_push_message(cli, tmpdir, datafiles): + project_dir = str(datafiles) + + # Set up an artifact cache. + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + # Configure artifact share + artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts') + user_config_file = str(tmpdir.join('buildstream.conf')) + user_config = { + 'scheduler': { + 'pushers': 1 + }, + 'artifacts': { + 'url': share.repo, + 'push': True, + } + } + + # Write down the user configuration file + _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file) + + queue = multiprocessing.Queue() + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + process = multiprocessing.Process(target=_test_push_message, + args=(user_config_file, project_dir, artifact_dir, queue)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + process.start() + + message_hash, message_size = queue.get() + process.join() + except KeyboardInterrupt: + utils._kill_process_tree(process.pid) + raise + + assert message_hash and message_size + message_digest = remote_execution_pb2.Digest(hash=message_hash, + size_bytes=message_size) + assert share.has_object(message_digest) + + +def _test_push_message(user_config_file, project_dir, artifact_dir, queue): + # Fake minimal context + context = Context() + context.load(config=user_config_file) + context.artifactdir = artifact_dir + context.set_message_handler(message_handler) + + # Load the project manually + project = Project(project_dir, context) + project.ensure_fully_loaded() + + # Create a local CAS cache handle + cas = CASCache(context) + + # Manually setup the CAS remote + cas.setup_remotes(use_config=True) + cas.initialize_remotes() + + if cas.has_push_remotes(): + # Create an example message object + command = remote_execution_pb2.Command(arguments=['/usr/bin/gcc', '--help'], + working_directory='/buildstream-build', + output_directories=['/buildstream-install']) + + # Push the message object + command_digest = cas.push_message(project, command) + + queue.put((command_digest.hash, command_digest.size_bytes)) + else: + queue.put("No remote configured") diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index 05e87a499..e3f709b0a 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -15,6 +15,7 @@ from buildstream._artifactcache.cascache import CASCache from buildstream._artifactcache.casserver import create_server from buildstream._context import Context from buildstream._exceptions import ArtifactError +from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 # ArtifactShare() @@ -87,6 +88,23 @@ class ArtifactShare(): # Sleep until termination by signal signal.pause() + # has_object(): + # + # Checks whether the object is present in the share + # + # Args: + # digest (str): The object's digest + # + # Returns: + # (bool): True if the object exists in the share, otherwise false. + def has_object(self, digest): + + assert isinstance(digest, remote_execution_pb2.Digest) + + object_path = self.cas.objpath(digest) + + return os.path.exists(object_path) + # has_artifact(): # # Checks whether the artifact is present in the share |