diff options
author | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-06-12 17:28:51 +0100 |
---|---|---|
committer | Jim MacArthur <jim.macarthur@codethink.co.uk> | 2018-06-12 17:28:51 +0100 |
commit | 747a26619f98a467be892ffce36e5b5e8a06c8a9 (patch) | |
tree | 5f98c196b3a462808a9b79677cc6bee240913c32 /buildstream/_artifactcache/cascache.py | |
parent | 23d84e229e3ba1fbf2a8d68b363c96cacb58ef44 (diff) | |
parent | 8c82f22f2bdbcbb623d24d80d2e72070e03e2080 (diff) | |
download | buildstream-jmac/googlecas_and_virtual_directories_2.tar.gz |
Merge branch 'juerg/googlecas' into jmac/virtual_directories_tempjmac/googlecas_and_virtual_directories_2
Diffstat (limited to 'buildstream/_artifactcache/cascache.py')
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 709 |
1 files changed, 709 insertions, 0 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py new file mode 100644 index 000000000..880d93ba4 --- /dev/null +++ b/buildstream/_artifactcache/cascache.py @@ -0,0 +1,709 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +import hashlib +import itertools +import multiprocessing +import os +import signal +import stat +import tempfile +from urllib.parse import urlparse + +import grpc + +from google.bytestream import bytestream_pb2, bytestream_pb2_grpc +from google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc +from buildstream import buildstream_pb2, buildstream_pb2_grpc + +from .. import _signals, utils +from .._exceptions import ArtifactError + +from . import ArtifactCache + + +# A CASCache manages artifacts in a CAS repository as specified in the +# Remote Execution API. +# +# Args: +# context (Context): The BuildStream context +# enable_push (bool): Whether pushing is allowed by the platform +# +# Pushing is explicitly disabled by the platform in some cases, +# like when we are falling back to functioning without using +# user namespaces. +# +class CASCache(ArtifactCache): + + def __init__(self, context, *, enable_push=True): + super().__init__(context) + + self.casdir = os.path.join(context.artifactdir, 'cas') + os.makedirs(os.path.join(self.casdir, 'tmp'), exist_ok=True) + + self._enable_push = enable_push + + # Per-project list of _CASRemote instances. + self._remotes = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + + ################################################ + # Implementation of abstract methods # + ################################################ + def contains(self, element, key): + refpath = self._refpath(self.get_artifact_fullname(element, key)) + + # This assumes that the repository doesn't have any dangling pointers + return os.path.exists(refpath) + + def extract(self, element, key): + ref = self.get_artifact_fullname(element, key) + + tree = self.resolve_ref(ref) + + dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, tree.hash) + if os.path.isdir(dest): + # artifact has already been extracted + return dest + + os.makedirs(self.extractdir, exist_ok=True) + + with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: + checkoutdir = os.path.join(tmpdir, ref) + self._checkout(checkoutdir, tree) + + os.makedirs(os.path.dirname(dest), exist_ok=True) + try: + os.rename(checkoutdir, dest) + except OSError as e: + # With rename it's possible to get either ENOTEMPTY or EEXIST + # in the case that the destination path is a not empty directory. + # + # If rename fails with these errors, another process beat + # us to it so just ignore. + if e.errno not in [os.errno.ENOTEMPTY, os.errno.EEXIST]: + raise ArtifactError("Failed to extract artifact for ref '{}': {}" + .format(ref, e)) from e + + return dest + + def commit(self, element, content, keys): + refs = [self.get_artifact_fullname(element, key) for key in keys] + + tree = self._create_tree(content) + + for ref in refs: + self.set_ref(ref, tree) + + def can_diff(self): + return True + + def diff(self, element, key_a, key_b, *, subdir=None): + ref_a = self.get_artifact_fullname(element, key_a) + ref_b = self.get_artifact_fullname(element, key_b) + + tree_a = self.resolve_ref(ref_a) + tree_b = self.resolve_ref(ref_b) + + if subdir: + tree_a = self._get_subdir(tree_a, subdir) + tree_b = self._get_subdir(tree_b, subdir) + + added = [] + removed = [] + modified = [] + + self._diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified) + + return modified, removed, added + + def initialize_remotes(self, *, on_failure=None): + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise ArtifactError(error) + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = _CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes + + def has_fetch_remotes(self, *, element=None): + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif element is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[element._get_project()] + return bool(remotes_for_project) + + def has_push_remotes(self, *, element=None): + if not self._has_push_remotes or not self._enable_push: + # No project has push remotes + return False + elif element is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[element._get_project()] + return any(remote.spec.push for remote in remotes_for_project) + + def pull(self, element, key, *, progress=None): + ref = self.get_artifact_fullname(element, key) + + project = element._get_project() + + for remote in self._remotes[project]: + try: + remote.init() + + request = buildstream_pb2.GetArtifactRequest() + request.key = ref + response = remote.artifact_cache.GetArtifact(request) + + tree = remote_execution_pb2.Digest() + tree.hash = response.artifact.hash + tree.size_bytes = response.artifact.size_bytes + + self._fetch_directory(remote, tree) + + self.set_ref(ref, tree) + + # no need to pull from additional remotes + return True + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise + + return False + + def link_key(self, element, oldkey, newkey): + oldref = self.get_artifact_fullname(element, oldkey) + newref = self.get_artifact_fullname(element, newkey) + + tree = self.resolve_ref(oldref) + + self.set_ref(newref, tree) + + def push(self, element, keys): + refs = [self.get_artifact_fullname(element, key) for key in keys] + + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + + for ref in refs: + tree = self.resolve_ref(ref) + + # Check whether ref is already on the server in which case + # there is no need to push the artifact + try: + request = buildstream_pb2.GetArtifactRequest() + request.key = ref + response = remote.artifact_cache.GetArtifact(request) + + if response.artifact.hash == tree.hash and response.artifact.size_bytes == tree.size_bytes: + # ref is already on the server with the same tree + continue + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise + + missing_blobs = {} + required_blobs = self._required_blobs(tree) + + # Limit size of FindMissingBlobs request + for required_blobs_group in _grouper(required_blobs, 512): + request = remote_execution_pb2.FindMissingBlobsRequest() + + for required_digest in required_blobs_group: + d = request.blob_digests.add() + d.hash = required_digest.hash + d.size_bytes = required_digest.size_bytes + + response = remote.cas.FindMissingBlobs(request) + for digest in response.missing_blob_digests: + d = remote_execution_pb2.Digest() + d.hash = digest.hash + d.size_bytes = digest.size_bytes + missing_blobs[d.hash] = d + + # Upload any blobs missing on the server + for digest in missing_blobs.values(): + def request_stream(): + resource_name = os.path.join(digest.hash, str(digest.size_bytes)) + with open(self.objpath(digest), 'rb') as f: + assert os.fstat(f.fileno()).st_size == digest.size_bytes + offset = 0 + finished = False + remaining = digest.size_bytes + while not finished: + chunk_size = min(remaining, 64 * 1024) + remaining -= chunk_size + + request = bytestream_pb2.WriteRequest() + request.write_offset = offset + # max. 64 kB chunks + request.data = f.read(chunk_size) + request.resource_name = resource_name + request.finish_write = remaining <= 0 + yield request + offset += chunk_size + finished = request.finish_write + response = remote.bytestream.Write(request_stream()) + + request = buildstream_pb2.UpdateArtifactRequest() + request.keys.append(ref) + request.artifact.hash = tree.hash + request.artifact.size_bytes = tree.size_bytes + remote.artifact_cache.UpdateArtifact(request) + + pushed = True + + return pushed + + ################################################ + # API Private Methods # + ################################################ + + # objpath(): + # + # Return the path of an object based on its digest. + # + # Args: + # digest (Digest): The digest of the object + # + # Returns: + # (str): The path of the object + # + def objpath(self, digest): + return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:]) + + # add_object(): + # + # Hash and write object to CAS. + # + # Args: + # digest (Digest): An optional Digest object to populate + # path (str): Path to file to add + # buffer (bytes): Byte buffer to add + # + # Returns: + # (Digest): The digest of the added object + # + # Either `path` or `buffer` must be passed, but not both. + # + def add_object(self, *, digest=None, path=None, buffer=None): + # Exactly one of the two parameters has to be specified + assert (path is None) != (buffer is None) + + if digest is None: + digest = remote_execution_pb2.Digest() + + try: + h = hashlib.sha256() + # Always write out new file to avoid corruption if input file is modified + with tempfile.NamedTemporaryFile(dir=os.path.join(self.casdir, 'tmp')) as out: + if path: + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b""): + h.update(chunk) + out.write(chunk) + else: + h.update(buffer) + out.write(buffer) + + out.flush() + + digest.hash = h.hexdigest() + digest.size_bytes = os.fstat(out.fileno()).st_size + + # Place file at final location + objpath = self.objpath(digest) + os.makedirs(os.path.dirname(objpath), exist_ok=True) + os.link(out.name, objpath) + + except FileExistsError as e: + # We can ignore the failed link() if the object is already in the repo. + pass + + except OSError as e: + raise ArtifactError("Failed to hash object: {}".format(e)) from e + + return digest + + # set_ref(): + # + # Create or replace a ref. + # + # Args: + # ref (str): The name of the ref + # + def set_ref(self, ref, tree): + refpath = self._refpath(ref) + os.makedirs(os.path.dirname(refpath), exist_ok=True) + with utils.save_file_atomic(refpath, 'wb') as f: + f.write(tree.SerializeToString()) + + # resolve_ref(): + # + # Resolve a ref to a digest. + # + # Args: + # ref (str): The name of the ref + # + # Returns: + # (Digest): The digest stored in the ref + # + def resolve_ref(self, ref): + refpath = self._refpath(ref) + + try: + with open(refpath, 'rb') as f: + digest = remote_execution_pb2.Digest() + digest.ParseFromString(f.read()) + return digest + + except FileNotFoundError as e: + raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + + ################################################ + # Local Private Methods # + ################################################ + def _checkout(self, dest, tree): + os.makedirs(dest, exist_ok=True) + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(tree), 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + # regular file, create hardlink + fullpath = os.path.join(dest, filenode.name) + os.link(self.objpath(filenode.digest), fullpath) + + if filenode.is_executable: + os.chmod(fullpath, stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | + stat.S_IROTH | stat.S_IXOTH) + + for dirnode in directory.directories: + fullpath = os.path.join(dest, dirnode.name) + self._checkout(fullpath, dirnode.digest) + + for symlinknode in directory.symlinks: + # symlink + fullpath = os.path.join(dest, symlinknode.name) + os.symlink(symlinknode.target, fullpath) + + def _refpath(self, ref): + return os.path.join(self.casdir, 'refs', 'heads', ref) + + def _create_tree(self, path, *, digest=None): + directory = remote_execution_pb2.Directory() + + for name in sorted(os.listdir(path)): + full_path = os.path.join(path, name) + mode = os.lstat(full_path).st_mode + if stat.S_ISDIR(mode): + dirnode = directory.directories.add() + dirnode.name = name + self._create_tree(full_path, digest=dirnode.digest) + elif stat.S_ISREG(mode): + filenode = directory.files.add() + filenode.name = name + self.add_object(path=full_path, digest=filenode.digest) + filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR + elif stat.S_ISLNK(mode): + symlinknode = directory.symlinks.add() + symlinknode.name = name + symlinknode.target = os.readlink(full_path) + else: + raise ArtifactError("Unsupported file type for {}".format(full_path)) + + return self.add_object(digest=digest, buffer=directory.SerializeToString()) + + def _get_subdir(self, tree, subdir): + head, name = os.path.split(subdir) + if head: + tree = self._get_subdir(tree, head) + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(tree), 'rb') as f: + directory.ParseFromString(f.read()) + + for dirnode in directory.directories: + if dirnode.name == name: + return dirnode.digest + + raise ArtifactError("Subdirectory {} not found".format(name)) + + def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): + dir_a = remote_execution_pb2.Directory() + dir_b = remote_execution_pb2.Directory() + + if tree_a: + with open(self.objpath(tree_a), 'rb') as f: + dir_a.ParseFromString(f.read()) + if tree_b: + with open(self.objpath(tree_b), 'rb') as f: + dir_b.ParseFromString(f.read()) + + a = 0 + b = 0 + while a < len(dir_a.files) or b < len(dir_b.files): + if b < len(dir_b.files) and (a >= len(dir_a.files) or + dir_a.files[a].name > dir_b.files[b].name): + added.append(os.path.join(path, dir_b.files[b].name)) + b += 1 + elif a < len(dir_a.files) and (b >= len(dir_b.files) or + dir_b.files[b].name > dir_a.files[a].name): + removed.append(os.path.join(path, dir_a.files[a].name)) + a += 1 + else: + # File exists in both directories + if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash: + modified.append(os.path.join(path, dir_a.files[a].name)) + a += 1 + b += 1 + + a = 0 + b = 0 + while a < len(dir_a.directories) or b < len(dir_b.directories): + if b < len(dir_b.directories) and (a >= len(dir_a.directories) or + dir_a.directories[a].name > dir_b.directories[b].name): + self._diff_trees(None, dir_b.directories[b].digest, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_b.directories[b].name)) + b += 1 + elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or + dir_b.directories[b].name > dir_a.directories[a].name): + self._diff_trees(dir_a.directories[a].digest, None, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_a.directories[a].name)) + a += 1 + else: + # Subdirectory exists in both directories + if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash: + self._diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_a.directories[a].name)) + a += 1 + b += 1 + + def _initialize_remote(self, remote_spec, q): + try: + remote = _CASRemote(remote_spec) + remote.init() + + request = buildstream_pb2.StatusRequest() + response = remote.artifact_cache.Status(request) + + if remote_spec.push and not response.allow_updates: + q.put('Artifact server does not allow push') + else: + # No error + q.put(None) + + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) + + def _required_blobs(self, tree): + # parse directory, and recursively add blobs + d = remote_execution_pb2.Digest() + d.hash = tree.hash + d.size_bytes = tree.size_bytes + yield d + + directory = remote_execution_pb2.Directory() + + with open(self.objpath(tree), 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + d = remote_execution_pb2.Digest() + d.hash = filenode.digest.hash + d.size_bytes = filenode.digest.size_bytes + yield d + + for dirnode in directory.directories: + yield from self._required_blobs(dirnode.digest) + + def _fetch_blob(self, remote, digest, out): + resource_name = os.path.join(digest.hash, str(digest.size_bytes)) + request = bytestream_pb2.ReadRequest() + request.resource_name = resource_name + request.read_offset = 0 + for response in remote.bytestream.Read(request): + out.write(response.data) + + out.flush() + assert digest.size_bytes == os.fstat(out.fileno()).st_size + + def _fetch_directory(self, remote, tree): + objpath = self.objpath(tree) + if os.path.exists(objpath): + # already in local cache + return + + with tempfile.NamedTemporaryFile(dir=os.path.join(self.casdir, 'tmp')) as out: + self._fetch_blob(remote, tree, out) + + directory = remote_execution_pb2.Directory() + + with open(out.name, 'rb') as f: + directory.ParseFromString(f.read()) + + for filenode in directory.files: + fileobjpath = self.objpath(tree) + if os.path.exists(fileobjpath): + # already in local cache + continue + + with tempfile.NamedTemporaryFile(dir=os.path.join(self.casdir, 'tmp')) as f: + self._fetch_blob(remote, filenode.digest, f) + + digest = self.add_object(path=f.name) + assert digest.hash == filenode.digest.hash + + for dirnode in directory.directories: + self._fetch_directory(remote, dirnode.digest) + + # place directory blob only in final location when we've downloaded + # all referenced blobs to avoid dangling references in the repository + digest = self.add_object(path=out.name) + assert digest.hash == tree.hash + + +# Represents a single remote CAS cache. +# +class _CASRemote(): + def __init__(self, spec): + self.spec = spec + self._initialized = False + self.channel = None + self.bytestream = None + self.cas = None + self.artifact_cache = None + + def init(self): + if not self._initialized: + url = urlparse(self.spec.url) + if url.scheme == 'http': + port = url.port or 80 + self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port)) + elif url.scheme == 'https': + port = url.port or 443 + + if self.spec.server_cert: + with open(self.spec.server_cert, 'rb') as f: + server_cert_bytes = f.read() + else: + server_cert_bytes = None + + if self.spec.client_key: + with open(self.spec.client_key, 'rb') as f: + client_key_bytes = f.read() + else: + client_key_bytes = None + + if self.spec.client_cert: + with open(self.spec.client_cert, 'rb') as f: + client_cert_bytes = f.read() + else: + client_cert_bytes = None + + credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes, + private_key=client_key_bytes, + certificate_chain=client_cert_bytes) + self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) + else: + raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) + + self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) + self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) + self.artifact_cache = buildstream_pb2_grpc.ArtifactCacheStub(self.channel) + + self._initialized = True + + +def _grouper(iterable, n): + # pylint: disable=stop-iteration-return + while True: + yield itertools.chain([next(iterable)], itertools.islice(iterable, n - 1)) |