diff options
Diffstat (limited to 'buildstream/_artifactcache')
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 346 | ||||
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 537 | ||||
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 20 |
3 files changed, 519 insertions, 384 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 8ea6c9dc2..06a2b84e0 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -17,17 +17,22 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import multiprocessing import os +import signal import string from collections import namedtuple from collections.abc import Mapping from ..types import _KeyStrength -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason from .._message import Message, MessageType +from .. import _signals from .. import utils from .. import _yaml +from .cascache import CASCache, CASRemote + CACHE_SIZE_FILE = "cache_size" @@ -93,7 +98,8 @@ class ArtifactCache(): def __init__(self, context): self.context = context self.extractdir = os.path.join(context.artifactdir, 'extract') - self.tmpdir = os.path.join(context.artifactdir, 'tmp') + + self.cas = CASCache(context.artifactdir) self.global_remote_specs = [] self.project_remote_specs = {} @@ -104,12 +110,15 @@ class ArtifactCache(): self._cache_lower_threshold = None # The target cache size for a cleanup self._remotes_setup = False # Check to prevent double-setup of remotes + # Per-project list of _CASRemote instances. + self._remotes = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + os.makedirs(self.extractdir, exist_ok=True) - os.makedirs(self.tmpdir, exist_ok=True) - ################################################ - # Methods implemented on the abstract class # - ################################################ + self._calculate_cache_quota() # get_artifact_fullname() # @@ -240,8 +249,10 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_mtime(element, key) - except ArtifactError: + ref = self.get_artifact_fullname(element, key) + + self.cas.update_mtime(ref) + except CASError: pass # clean(): @@ -252,7 +263,7 @@ class ArtifactCache(): # (int): The size of the cache after having cleaned up # def clean(self): - artifacts = self.list_artifacts() # pylint: disable=assignment-from-no-return + artifacts = self.list_artifacts() # Build a set of the cache keys which are required # based on the required elements at cleanup time @@ -294,7 +305,7 @@ class ArtifactCache(): if key not in required_artifacts: # Remove the actual artifact, if it's not required. - size = self.remove(to_remove) # pylint: disable=assignment-from-no-return + size = self.remove(to_remove) # Remove the size from the removed size self.set_cache_size(self._cache_size - size) @@ -311,7 +322,7 @@ class ArtifactCache(): # (int): The size of the artifact cache. # def compute_cache_size(self): - self._cache_size = self.calculate_cache_size() # pylint: disable=assignment-from-no-return + self._cache_size = self.cas.calculate_cache_size() return self._cache_size @@ -380,28 +391,12 @@ class ArtifactCache(): def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota - ################################################ - # Abstract methods for subclasses to implement # - ################################################ - # preflight(): # # Preflight check. # def preflight(self): - pass - - # update_mtime() - # - # Update the mtime of an artifact. - # - # Args: - # element (Element): The Element to update - # key (str): The key of the artifact. - # - def update_mtime(self, element, key): - raise ImplError("Cache '{kind}' does not implement update_mtime()" - .format(kind=type(self).__name__)) + self.cas.preflight() # initialize_remotes(): # @@ -411,7 +406,59 @@ class ArtifactCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - pass + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise ArtifactError(error) + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes # contains(): # @@ -425,8 +472,9 @@ class ArtifactCache(): # Returns: True if the artifact is in the cache, False otherwise # def contains(self, element, key): - raise ImplError("Cache '{kind}' does not implement contains()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + return self.cas.contains(ref) # list_artifacts(): # @@ -437,8 +485,7 @@ class ArtifactCache(): # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): - raise ImplError("Cache '{kind}' does not implement list_artifacts()" - .format(kind=type(self).__name__)) + return self.cas.list_refs() # remove(): # @@ -450,9 +497,31 @@ class ArtifactCache(): # generated by # `ArtifactCache.get_artifact_fullname`) # - def remove(self, artifact_name): - raise ImplError("Cache '{kind}' does not implement remove()" - .format(kind=type(self).__name__)) + # Returns: + # (int|None) The amount of space pruned from the repository in + # Bytes, or None if defer_prune is True + # + def remove(self, ref): + + # Remove extract if not used by other ref + tree = self.cas.resolve_ref(ref) + ref_name, ref_hash = os.path.split(ref) + extract = os.path.join(self.extractdir, ref_name, tree.hash) + keys_file = os.path.join(extract, 'meta', 'keys.yaml') + if os.path.exists(keys_file): + keys_meta = _yaml.load(keys_file) + keys = [keys_meta['strong'], keys_meta['weak']] + remove_extract = True + for other_hash in keys: + if other_hash == ref_hash: + continue + remove_extract = False + break + + if remove_extract: + utils._force_rmtree(extract) + + return self.cas.remove(ref) # extract(): # @@ -472,8 +541,11 @@ class ArtifactCache(): # Returns: path to extracted artifact # def extract(self, element, key): - raise ImplError("Cache '{kind}' does not implement extract()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + path = os.path.join(self.extractdir, element._get_project().name, element.normal_name) + + return self.cas.extract(ref, path) # commit(): # @@ -485,8 +557,9 @@ class ArtifactCache(): # keys (list): The cache keys to use # def commit(self, element, content, keys): - raise ImplError("Cache '{kind}' does not implement commit()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in keys] + + self.cas.commit(refs, content) # diff(): # @@ -500,8 +573,10 @@ class ArtifactCache(): # subdir (str): A subdirectory to limit the comparison to # def diff(self, element, key_a, key_b, *, subdir=None): - raise ImplError("Cache '{kind}' does not implement diff()" - .format(kind=type(self).__name__)) + ref_a = self.get_artifact_fullname(element, key_a) + ref_b = self.get_artifact_fullname(element, key_b) + + return self.cas.diff(ref_a, ref_b, subdir=subdir) # has_fetch_remotes(): # @@ -513,7 +588,16 @@ class ArtifactCache(): # Returns: True if any remote repositories are configured, False otherwise # def has_fetch_remotes(self, *, element=None): - return False + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif element is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[element._get_project()] + return bool(remotes_for_project) # has_push_remotes(): # @@ -525,7 +609,16 @@ class ArtifactCache(): # Returns: True if any remote repository is configured, False otherwise # def has_push_remotes(self, *, element=None): - return False + if not self._has_push_remotes: + # No project has push remotes + return False + elif element is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[element._get_project()] + return any(remote.spec.push for remote in remotes_for_project) # push(): # @@ -542,8 +635,28 @@ class ArtifactCache(): # (ArtifactError): if there was an error # def push(self, element, keys): - raise ImplError("Cache '{kind}' does not implement push()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] + + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + + if self.cas.push(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + return pushed # pull(): # @@ -558,8 +671,130 @@ class ArtifactCache(): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, progress=None): - raise ImplError("Cache '{kind}' does not implement pull()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + project = element._get_project() + + for remote in self._remotes[project]: + try: + display_key = element._get_brief_display_key() + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) + + if self.cas.pull(ref, remote, progress=progress): + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + except CASError as e: + raise ArtifactError("Failed to pull artifact {}: {}".format( + element._get_brief_display_key(), e)) from e + + return False + + # pull_tree(): + # + # Pull a single Tree rather than an artifact. + # Does not update local refs. + # + # Args: + # project (Project): The current project + # digest (Digest): The digest of the tree + # + def pull_tree(self, project, digest): + for remote in self._remotes[project]: + digest = self.cas.pull_tree(remote, digest) + + if digest: + # no need to pull from additional remotes + return digest + + return None + + # push_directory(): + # + # Push the given virtual directory to all remotes. + # + # Args: + # project (Project): The current project + # directory (Directory): A virtual directory object to push. + # + # Raises: + # (ArtifactError): if there was an error + # + def push_directory(self, project, directory): + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("push_directory was called, but no remote artifact " + + "servers are configured as push remotes.") + + if directory.ref is None: + return + + for remote in push_remotes: + self.cas.push_directory(remote, directory) + + # push_message(): + # + # Push the given protobuf message to all remotes. + # + # Args: + # project (Project): The current project + # message (Message): A protobuf message to push. + # + # Raises: + # (ArtifactError): if there was an error + # + def push_message(self, project, message): + + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("push_message was called, but no remote artifact " + + "servers are configured as push remotes.") + + for remote in push_remotes: + message_digest = self.cas.push_message(remote, message) + + return message_digest + + # verify_digest_pushed(): + # + # Check whether the object is already on the server in which case + # there is no need to upload it. + # + # Args: + # project (Project): The current project + # digest (Digest): The object digest. + # + def verify_digest_pushed(self, project, digest): + + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("verify_digest_pushed was called, but no remote artifact " + + "servers are configured as push remotes.") + + pushed = False + + for remote in push_remotes: + if self.cas.verify_digest_on_remote(remote, digest): + pushed = True + + return pushed # link_key(): # @@ -571,19 +806,10 @@ class ArtifactCache(): # newkey (str): A new cache key for the artifact # def link_key(self, element, oldkey, newkey): - raise ImplError("Cache '{kind}' does not implement link_key()" - .format(kind=type(self).__name__)) + oldref = self.get_artifact_fullname(element, oldkey) + newref = self.get_artifact_fullname(element, newkey) - # calculate_cache_size() - # - # Return the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def calculate_cache_size(self): - raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" - .format(kind=type(self).__name__)) + self.cas.link_ref(oldref, newref) ################################################ # Local Private Methods # diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index b99f4d7b8..b6e26ec8b 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -20,9 +20,7 @@ import hashlib import itertools import io -import multiprocessing import os -import signal import stat import tempfile import uuid @@ -31,17 +29,13 @@ from urllib.parse import urlparse import grpc -from .. import _yaml - from .._protos.google.rpc import code_pb2 from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .. import _signals, utils -from .._exceptions import ArtifactError - -from . import ArtifactCache +from .. import utils +from .._exceptions import CASError # The default limit for gRPC messages is 4 MiB. @@ -49,62 +43,68 @@ from . import ArtifactCache _MAX_PAYLOAD_BYTES = 1024 * 1024 -# A CASCache manages artifacts in a CAS repository as specified in the -# Remote Execution API. +# A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: -# context (Context): The BuildStream context -# -# Pushing is explicitly disabled by the platform in some cases, -# like when we are falling back to functioning without using -# user namespaces. +# path (str): The root directory for the CAS repository # -class CASCache(ArtifactCache): +class CASCache(): - def __init__(self, context): - super().__init__(context) - - self.casdir = os.path.join(context.artifactdir, 'cas') + def __init__(self, path): + self.casdir = os.path.join(path, 'cas') + self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) - self._calculate_cache_quota() - - # Per-project list of _CASRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - ################################################ - # Implementation of abstract methods # - ################################################ - + # preflight(): + # + # Preflight check. + # def preflight(self): headdir = os.path.join(self.casdir, 'refs', 'heads') objdir = os.path.join(self.casdir, 'objects') if not (os.path.isdir(headdir) and os.path.isdir(objdir)): - raise ArtifactError("CAS repository check failed for '{}'" - .format(self.casdir)) + raise CASError("CAS repository check failed for '{}'".format(self.casdir)) - def contains(self, element, key): - refpath = self._refpath(self.get_artifact_fullname(element, key)) + # contains(): + # + # Check whether the specified ref is already available in the local CAS cache. + # + # Args: + # ref (str): The ref to check + # + # Returns: True if the ref is in the cache, False otherwise + # + def contains(self, ref): + refpath = self._refpath(ref) # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) - def extract(self, element, key): - ref = self.get_artifact_fullname(element, key) - + # extract(): + # + # Extract cached directory for the specified ref if it hasn't + # already been extracted. + # + # Args: + # ref (str): The ref whose directory to extract + # path (str): The destination path + # + # Raises: + # CASError: In cases there was an OSError, or if the ref did not exist. + # + # Returns: path to extracted directory + # + def extract(self, ref, path): tree = self.resolve_ref(ref, update_mtime=True) - dest = os.path.join(self.extractdir, element._get_project().name, - element.normal_name, tree.hash) + dest = os.path.join(path, tree.hash) if os.path.isdir(dest): - # artifact has already been extracted + # directory has already been extracted return dest - with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: + with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir: checkoutdir = os.path.join(tmpdir, ref) self._checkout(checkoutdir, tree) @@ -118,23 +118,35 @@ class CASCache(ArtifactCache): # If rename fails with these errors, another process beat # us to it so just ignore. if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]: - raise ArtifactError("Failed to extract artifact for ref '{}': {}" - .format(ref, e)) from e + raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e return dest - def commit(self, element, content, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] - - tree = self._commit_directory(content) + # commit(): + # + # Commit directory to cache. + # + # Args: + # refs (list): The refs to set + # path (str): The directory to import + # + def commit(self, refs, path): + tree = self._commit_directory(path) for ref in refs: self.set_ref(ref, tree) - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = self.get_artifact_fullname(element, key_a) - ref_b = self.get_artifact_fullname(element, key_b) - + # diff(): + # + # Return a list of files that have been added or modified between + # the refs described by ref_a and ref_b. + # + # Args: + # ref_a (str): The first ref + # ref_b (str): The second ref + # subdir (str): A subdirectory to limit the comparison to + # + def diff(self, ref_a, ref_b, *, subdir=None): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) @@ -150,158 +162,122 @@ class CASCache(ArtifactCache): return modified, removed, added - def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs - - for project in self.project_remote_specs: - remote_specs += self.project_remote_specs[project] - - remote_specs = list(utils._deduplicate(remote_specs)) - - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) + def initialize_remote(self, remote_spec, q): + try: + remote = CASRemote(remote_spec) + remote.init() - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() - - error = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise + request = buildstream_pb2.StatusRequest() + response = remote.ref_storage.Status(request) - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) + if remote_spec.push and not response.allow_updates: + q.put('CAS server does not allow push') else: - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True + # No error + q.put(None) - remotes[remote_spec.url] = _CASRemote(remote_spec) + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + q.put(e.details()) - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) - project_remotes = [] + # pull(): + # + # Pull a ref from a remote repository. + # + # Args: + # ref (str): The ref to pull + # remote (CASRemote): The remote repository to pull from + # progress (callable): The progress callback, if any + # + # Returns: + # (bool): True if pull was successful, False if ref was not available + # + def pull(self, ref, remote, *, progress=None): + try: + remote.init() - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + response = remote.ref_storage.GetReference(request) - remote = remotes[remote_spec.url] - project_remotes.append(remote) + tree = remote_execution_pb2.Digest() + tree.hash = response.digest.hash + tree.size_bytes = response.digest.size_bytes - self._remotes[project] = project_remotes + self._fetch_directory(remote, tree) - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has fetch remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) + self.set_ref(ref, tree) - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - - def pull(self, element, key, *, progress=None): - ref = self.get_artifact_fullname(element, key) - - project = element._get_project() - - for remote in self._remotes[project]: - try: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes - - self._fetch_directory(remote, tree) - - self.set_ref(ref, tree) - - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact {}: {}".format( - element._get_brief_display_key(), e)) from e - else: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, element._get_brief_display_key() - )) - - return False - - def pull_tree(self, project, digest): - """ Pull a single Tree rather than an artifact. - Does not update local refs. """ + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e + else: + return False - for remote in self._remotes[project]: - try: - remote.init() + # pull_tree(): + # + # Pull a single Tree rather than a ref. + # Does not update local refs. + # + # Args: + # remote (CASRemote): The remote to pull from + # digest (Digest): The digest of the tree + # + def pull_tree(self, remote, digest): + try: + remote.init() - digest = self._fetch_tree(remote, digest) + digest = self._fetch_tree(remote, digest) - # no need to pull from additional remotes - return digest + return digest - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise return None - def link_key(self, element, oldkey, newkey): - oldref = self.get_artifact_fullname(element, oldkey) - newref = self.get_artifact_fullname(element, newkey) - + # link_ref(): + # + # Add an alias for an existing ref. + # + # Args: + # oldref (str): An existing ref + # newref (str): A new ref for the same directory + # + def link_ref(self, oldref, newref): tree = self.resolve_ref(oldref) self.set_ref(newref, tree) - def _push_refs_to_remote(self, refs, remote): + # push(): + # + # Push committed refs to remote repository. + # + # Args: + # refs (list): The refs to push + # remote (CASRemote): The remote to push to + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (CASError): if there was an error + # + def push(self, refs, remote): skipped_remote = True try: for ref in refs: tree = self.resolve_ref(ref) # Check whether ref is already on the server in which case - # there is no need to push the artifact + # there is no need to push the ref try: request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -327,65 +303,38 @@ class CASCache(ArtifactCache): skipped_remote = False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e return not skipped_remote - def push(self, element, keys): - - refs = [self.get_artifact_fullname(element, key) for key in list(keys)] - - project = element._get_project() - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - - if self._push_refs_to_remote(refs, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - pushed = True - else: - element.info("Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key() - )) - - return pushed - - def push_directory(self, project, directory): - """ Push the given virtual directory to all remotes. - - Args: - project (Project): The current project - directory (Directory): A virtual directory object to push. - - Raises: ArtifactError if no push remotes are configured. - """ - - if self._has_push_remotes: - push_remotes = [r for r in self._remotes[project] if r.spec.push] - else: - push_remotes = [] - - if not push_remotes: - raise ArtifactError("CASCache: push_directory was called, but no remote artifact " + - "servers are configured as push remotes.") - - if directory.ref is None: - return - - for remote in push_remotes: - remote.init() - - self._send_directory(remote, directory.ref) + # push_directory(): + # + # Push the given virtual directory to a remote. + # + # Args: + # remote (CASRemote): The remote to push to + # directory (Directory): A virtual directory object to push. + # + # Raises: + # (CASError): if there was an error + # + def push_directory(self, remote, directory): + remote.init() - def push_message(self, project, message): + self._send_directory(remote, directory.ref) - push_remotes = [r for r in self._remotes[project] if r.spec.push] + # push_message(): + # + # Push the given protobuf message to a remote. + # + # Args: + # remote (CASRemote): The remote to push to + # message (Message): A protobuf message to push. + # + # Raises: + # (CASError): if there was an error + # + def push_message(self, remote, message): message_buffer = message.SerializeToString() message_sha = hashlib.sha256(message_buffer) @@ -393,17 +342,25 @@ class CASCache(ArtifactCache): message_digest.hash = message_sha.hexdigest() message_digest.size_bytes = len(message_buffer) - for remote in push_remotes: - remote.init() + remote.init() - with io.BytesIO(message_buffer) as b: - self._send_blob(remote, message_digest, b) + with io.BytesIO(message_buffer) as b: + self._send_blob(remote, message_digest, b) return message_digest - def _verify_digest_on_remote(self, remote, digest): - # Check whether ref is already on the server in which case - # there is no need to push the artifact + # verify_digest_on_remote(): + # + # Check whether the object is already on the server in which case + # there is no need to upload it. + # + # Args: + # remote (CASRemote): The remote to check + # digest (Digest): The object digest. + # + def verify_digest_on_remote(self, remote, digest): + remote.init() + request = remote_execution_pb2.FindMissingBlobsRequest() request.blob_digests.extend([digest]) @@ -413,24 +370,6 @@ class CASCache(ArtifactCache): return True - def verify_digest_pushed(self, project, digest): - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - - if self._verify_digest_on_remote(remote, digest): - pushed = True - - return pushed - - ################################################ - # API Private Methods # - ################################################ - # objpath(): # # Return the path of an object based on its digest. @@ -496,7 +435,7 @@ class CASCache(ArtifactCache): pass except OSError as e: - raise ArtifactError("Failed to hash object: {}".format(e)) from e + raise CASError("Failed to hash object: {}".format(e)) from e return digest @@ -537,26 +476,39 @@ class CASCache(ArtifactCache): return digest except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e - def update_mtime(self, element, key): + # update_mtime() + # + # Update the mtime of a ref. + # + # Args: + # ref (str): The ref to update + # + def update_mtime(self, ref): try: - ref = self.get_artifact_fullname(element, key) os.utime(self._refpath(ref)) except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e + # calculate_cache_size() + # + # Return the real disk usage of the CAS cache. + # + # Returns: + # (int): The size of the cache. + # def calculate_cache_size(self): return utils._get_dir_size(self.casdir) - # list_artifacts(): + # list_refs(): # - # List cached artifacts in Least Recently Modified (LRM) order. + # List refs in Least Recently Modified (LRM) order. # # Returns: # (list) - A list of refs in LRM order # - def list_artifacts(self): + def list_refs(self): # string of: /path/to/repo/refs/heads ref_heads = os.path.join(self.casdir, 'refs', 'heads') @@ -571,7 +523,7 @@ class CASCache(ArtifactCache): mtimes.append(os.path.getmtime(ref_path)) # NOTE: Sorted will sort from earliest to latest, thus the - # first element of this list will be the file modified earliest. + # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] # remove(): @@ -590,28 +542,10 @@ class CASCache(ArtifactCache): # def remove(self, ref, *, defer_prune=False): - # Remove extract if not used by other ref - tree = self.resolve_ref(ref) - ref_name, ref_hash = os.path.split(ref) - extract = os.path.join(self.extractdir, ref_name, tree.hash) - keys_file = os.path.join(extract, 'meta', 'keys.yaml') - if os.path.exists(keys_file): - keys_meta = _yaml.load(keys_file) - keys = [keys_meta['strong'], keys_meta['weak']] - remove_extract = True - for other_hash in keys: - if other_hash == ref_hash: - continue - remove_extract = False - break - - if remove_extract: - utils._force_rmtree(extract) - # Remove cache ref refpath = self._refpath(ref) if not os.path.exists(refpath): - raise ArtifactError("Could not find artifact for ref '{}'".format(ref)) + raise CASError("Could not find ref '{}'".format(ref)) os.unlink(refpath) @@ -721,7 +655,7 @@ class CASCache(ArtifactCache): # The process serving the socket can't be cached anyway pass else: - raise ArtifactError("Unsupported file type for {}".format(full_path)) + raise CASError("Unsupported file type for {}".format(full_path)) return self.add_object(digest=dir_digest, buffer=directory.SerializeToString()) @@ -740,7 +674,7 @@ class CASCache(ArtifactCache): if dirnode.name == name: return dirnode.digest - raise ArtifactError("Subdirectory {} not found".format(name)) + raise CASError("Subdirectory {} not found".format(name)) def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): dir_a = remote_execution_pb2.Directory() @@ -812,29 +746,6 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest) - def _initialize_remote(self, remote_spec, q): - try: - remote = _CASRemote(remote_spec) - remote.init() - - request = buildstream_pb2.StatusRequest() - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('Artifact server does not allow push') - else: - # No error - q.put(None) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - q.put(e.details()) - - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put(str(e)) - def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() @@ -1080,7 +991,7 @@ class CASCache(ArtifactCache): # Represents a single remote CAS cache. # -class _CASRemote(): +class CASRemote(): def __init__(self, spec): self.spec = spec self._initialized = False @@ -1125,7 +1036,7 @@ class _CASRemote(): certificate_chain=client_cert_bytes) self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) else: - raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) + raise CASError("Unsupported URL: {}".format(self.spec.url)) self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) @@ -1203,10 +1114,10 @@ class _CASBatchRead(): for response in batch_response.responses: if response.status.code != code_pb2.OK: - raise ArtifactError("Failed to download blob {}: {}".format( + raise CASError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.digest.size_bytes != len(response.data): - raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) yield (response.digest, response.data) @@ -1248,7 +1159,7 @@ class _CASBatchUpdate(): for response in batch_response.responses: if response.status.code != code_pb2.OK: - raise ArtifactError("Failed to upload blob {}: {}".format( + raise CASError("Failed to upload blob {}: {}".format( response.digest.hash, response.status.code)) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 31b05ce0f..ee84c4943 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._exceptions import ArtifactError -from .._context import Context +from .._exceptions import CASError + +from .cascache import CASCache # The default limit for gRPC messages is 4 MiB. @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception): # enable_push (bool): Whether to allow blob uploads and artifact updates # def create_server(repo, *, enable_push): - context = Context() - context.artifactdir = os.path.abspath(repo) - - artifactcache = context.artifactcache + cas = CASCache(os.path.abspath(repo)) # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(artifactcache, enable_push=enable_push), server) + _ByteStreamServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server) + _ReferenceStorageServicer(cas, enable_push=enable_push), server) return server @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes - except ArtifactError: + except CASError: context.set_code(grpc.StatusCode.NOT_FOUND) return response @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size): return 0 # obtain a list of LRP artifacts - LRP_artifacts = cas.list_artifacts() + LRP_artifacts = cas.list_refs() removed_size = 0 # in bytes while object_size - removed_size > free_disk_space: |