diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-07-30 14:41:03 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-07-30 14:41:03 +0000 |
commit | 0ff177595e710ade7f79fe2fa7dff44167b8b324 (patch) | |
tree | 89c52a0537dd34a2c736b87dcc175bf8f4b8ab90 | |
parent | 0fee511a240d4b49532f9d2c2c3cf9fe0baebf7b (diff) | |
parent | 4df98d3483d4babe809c1cb96cffc946dd1bc768 (diff) | |
download | buildstream-0ff177595e710ade7f79fe2fa7dff44167b8b324.tar.gz |
Merge branch 'abderrahim/artifact-cas' into 'bst-1'
Split up artifact cache and CAS cache
See merge request BuildStream/buildstream!1969
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 245 | ||||
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 434 | ||||
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 21 | ||||
-rw-r--r-- | buildstream/_context.py | 3 | ||||
-rw-r--r-- | buildstream/_elementfactory.py | 5 | ||||
-rw-r--r-- | buildstream/_exceptions.py | 10 | ||||
-rw-r--r-- | buildstream/_loader/loader.py | 2 | ||||
-rw-r--r-- | buildstream/_pipeline.py | 2 | ||||
-rw-r--r-- | buildstream/_project.py | 12 | ||||
-rw-r--r-- | buildstream/element.py | 15 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 12 |
11 files changed, 398 insertions, 363 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 48fd32218..38500a048 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -17,16 +17,21 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import multiprocessing import os +import signal import string from collections import Mapping, namedtuple from ..types import _KeyStrength -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason from .._message import Message, MessageType +from .. import _signals from .. import utils from .. import _yaml +from .cascache import CASCache, CASRemote + CACHE_SIZE_FILE = "cache_size" @@ -125,7 +130,8 @@ class ArtifactCache(): def __init__(self, context): self.context = context self.extractdir = os.path.join(context.artifactdir, 'extract') - self.tmpdir = os.path.join(context.artifactdir, 'tmp') + + self.cas = CASCache(context.artifactdir) self.global_remote_specs = [] self.project_remote_specs = {} @@ -136,12 +142,15 @@ class ArtifactCache(): self._cache_quota_original = None # The cache quota as specified by the user, in bytes self._cache_lower_threshold = None # The target cache size for a cleanup + # Per-project list of _CASRemote instances. + self._remotes = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + os.makedirs(self.extractdir, exist_ok=True) - os.makedirs(self.tmpdir, exist_ok=True) - ################################################ - # Methods implemented on the abstract class # - ################################################ + self._calculate_cache_quota() # get_artifact_fullname() # @@ -256,7 +265,7 @@ class ArtifactCache(): self._required_elements.update(elements) # For the cache keys which were resolved so far, we bump - # the atime of them. + # the mtime of them. # # This is just in case we have concurrent instances of # BuildStream running with the same artifact cache, it will @@ -268,8 +277,10 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_atime(key) - except ArtifactError: + ref = self.get_artifact_fullname(element, key) + + self.cas.update_mtime(ref) + except CASError: pass # clean(): @@ -392,7 +403,7 @@ class ArtifactCache(): # def compute_cache_size(self): old_cache_size = self._cache_size - new_cache_size = self.calculate_cache_size() + new_cache_size = self.cas.calculate_cache_size() if old_cache_size != new_cache_size: self._cache_size = new_cache_size @@ -467,27 +478,12 @@ class ArtifactCache(): def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota - ################################################ - # Abstract methods for subclasses to implement # - ################################################ - # preflight(): # # Preflight check. # def preflight(self): - pass - - # update_atime() - # - # Update the atime of an artifact. - # - # Args: - # key (str): The key of the artifact. - # - def update_atime(self, key): - raise ImplError("Cache '{kind}' does not implement contains()" - .format(kind=type(self).__name__)) + self.cas.preflight() # initialize_remotes(): # @@ -497,7 +493,59 @@ class ArtifactCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - pass + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise ArtifactError(error) + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes # contains(): # @@ -511,8 +559,9 @@ class ArtifactCache(): # Returns: True if the artifact is in the cache, False otherwise # def contains(self, element, key): - raise ImplError("Cache '{kind}' does not implement contains()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + return self.cas.contains(ref) # list_artifacts(): # @@ -523,8 +572,7 @@ class ArtifactCache(): # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): - raise ImplError("Cache '{kind}' does not implement list_artifacts()" - .format(kind=type(self).__name__)) + return self.cas.list_refs() # remove(): # @@ -536,9 +584,31 @@ class ArtifactCache(): # generated by # `ArtifactCache.get_artifact_fullname`) # - def remove(self, artifact_name): - raise ImplError("Cache '{kind}' does not implement remove()" - .format(kind=type(self).__name__)) + # Returns: + # (int|None) The amount of space pruned from the repository in + # Bytes, or None if defer_prune is True + # + def remove(self, ref): + + # Remove extract if not used by other ref + tree = self.cas.resolve_ref(ref) + ref_name, ref_hash = os.path.split(ref) + extract = os.path.join(self.extractdir, ref_name, tree.hash) + keys_file = os.path.join(extract, 'meta', 'keys.yaml') + if os.path.exists(keys_file): + keys_meta = _yaml.load(keys_file) + keys = [keys_meta['strong'], keys_meta['weak']] + remove_extract = True + for other_hash in keys: + if other_hash == ref_hash: + continue + remove_extract = False + break + + if remove_extract: + utils._force_rmtree(extract) + + return self.cas.remove(ref) # extract(): # @@ -558,8 +628,11 @@ class ArtifactCache(): # Returns: path to extracted artifact # def extract(self, element, key): - raise ImplError("Cache '{kind}' does not implement extract()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + path = os.path.join(self.extractdir, element._get_project().name, element.normal_name) + + return self.cas.extract(ref, path) # commit(): # @@ -571,8 +644,9 @@ class ArtifactCache(): # keys (list): The cache keys to use # def commit(self, element, content, keys): - raise ImplError("Cache '{kind}' does not implement commit()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in keys] + + self.cas.commit(refs, content) # diff(): # @@ -586,8 +660,10 @@ class ArtifactCache(): # subdir (str): A subdirectory to limit the comparison to # def diff(self, element, key_a, key_b, *, subdir=None): - raise ImplError("Cache '{kind}' does not implement diff()" - .format(kind=type(self).__name__)) + ref_a = self.get_artifact_fullname(element, key_a) + ref_b = self.get_artifact_fullname(element, key_b) + + return self.cas.diff(ref_a, ref_b, subdir=subdir) # has_fetch_remotes(): # @@ -599,7 +675,16 @@ class ArtifactCache(): # Returns: True if any remote repositories are configured, False otherwise # def has_fetch_remotes(self, *, element=None): - return False + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif element is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[element._get_project()] + return bool(remotes_for_project) # has_push_remotes(): # @@ -611,7 +696,16 @@ class ArtifactCache(): # Returns: True if any remote repository is configured, False otherwise # def has_push_remotes(self, *, element=None): - return False + if not self._has_push_remotes: + # No project has push remotes + return False + elif element is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[element._get_project()] + return any(remote.spec.push for remote in remotes_for_project) # push(): # @@ -628,8 +722,28 @@ class ArtifactCache(): # (ArtifactError): if there was an error # def push(self, element, keys): - raise ImplError("Cache '{kind}' does not implement push()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] + + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + + if self.cas.push(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + return pushed # pull(): # @@ -644,8 +758,32 @@ class ArtifactCache(): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, progress=None): - raise ImplError("Cache '{kind}' does not implement pull()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + display_key = key[:self.context.log_key_length] + + project = element._get_project() + + for remote in self._remotes[project]: + try: + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) + + if self.cas.pull(ref, remote, progress=progress): + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, display_key + )) + except BlobNotFound as e: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, display_key + )) + except CASError as e: + raise ArtifactError("Failed to pull artifact {}: {}".format( + display_key, e)) from e + + return False # link_key(): # @@ -657,19 +795,10 @@ class ArtifactCache(): # newkey (str): A new cache key for the artifact # def link_key(self, element, oldkey, newkey): - raise ImplError("Cache '{kind}' does not implement link_key()" - .format(kind=type(self).__name__)) + oldref = self.get_artifact_fullname(element, oldkey) + newref = self.get_artifact_fullname(element, newkey) - # calculate_cache_size() - # - # Return the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def calculate_cache_size(self): - raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" - .format(kind=type(self).__name__)) + self.cas.link_ref(oldref, newref) ################################################ # Local Private Methods # diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 4f0d10da5..c6efcf508 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -19,9 +19,7 @@ import hashlib import itertools -import multiprocessing import os -import signal import stat import tempfile import uuid @@ -31,17 +29,12 @@ from urllib.parse import urlparse import grpc -from .. import _yaml - from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._message import MessageType, Message -from .. import _signals, utils -from .._exceptions import ArtifactError - -from . import ArtifactCache +from .. import utils +from .._exceptions import CASError # The default limit for gRPC messages is 4 MiB. @@ -89,68 +82,74 @@ def _retry(tries=5): break -class BlobNotFound(ArtifactError): +class BlobNotFound(CASError): def __init__(self, blob, msg): self.blob = blob super().__init__(msg) -# A CASCache manages artifacts in a CAS repository as specified in the -# Remote Execution API. +# A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: -# context (Context): The BuildStream context -# -# Pushing is explicitly disabled by the platform in some cases, -# like when we are falling back to functioning without using -# user namespaces. +# path (str): The root directory for the CAS repository # -class CASCache(ArtifactCache): +class CASCache(): - def __init__(self, context): - super().__init__(context) - - self.casdir = os.path.join(context.artifactdir, 'cas') + def __init__(self, path): + self.casdir = os.path.join(path, 'cas') + self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) - self._calculate_cache_quota() - - # Per-project list of _CASRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - ################################################ - # Implementation of abstract methods # - ################################################ - + # preflight(): + # + # Preflight check. + # def preflight(self): if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or not os.path.isdir(os.path.join(self.casdir, 'objects'))): - raise ArtifactError("CAS repository check failed for '{}'" - .format(self.casdir)) + raise CASError("CAS repository check failed for '{}'".format(self.casdir)) - def contains(self, element, key): - refpath = self._refpath(self.get_artifact_fullname(element, key)) + # contains(): + # + # Check whether the specified ref is already available in the local CAS cache. + # + # Args: + # ref (str): The ref to check + # + # Returns: True if the ref is in the cache, False otherwise + # + def contains(self, ref): + refpath = self._refpath(ref) # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) - def extract(self, element, key): - ref = self.get_artifact_fullname(element, key) - + # extract(): + # + # Extract cached directory for the specified ref if it hasn't + # already been extracted. + # + # Args: + # ref (str): The ref whose directory to extract + # path (str): The destination path + # + # Raises: + # CASError: In cases there was an OSError, or if the ref did not exist. + # + # Returns: path to extracted directory + # + def extract(self, ref, path): tree = self.resolve_ref(ref, update_mtime=True) - dest = os.path.join(self.extractdir, element._get_project().name, - element.normal_name, tree.hash) + dest = os.path.join(path, tree.hash) if os.path.isdir(dest): - # artifact has already been extracted + # directory has already been extracted return dest - with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: + with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir: checkoutdir = os.path.join(tmpdir, ref) self._checkout(checkoutdir, tree) @@ -164,23 +163,35 @@ class CASCache(ArtifactCache): # If rename fails with these errors, another process beat # us to it so just ignore. if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]: - raise ArtifactError("Failed to extract artifact for ref '{}': {}" - .format(ref, e)) from e + raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e return dest - def commit(self, element, content, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] - - tree = self._commit_directory(content) + # commit(): + # + # Commit directory to cache. + # + # Args: + # refs (list): The refs to set + # path (str): The directory to import + # + def commit(self, refs, path): + tree = self._commit_directory(path) for ref in refs: self.set_ref(ref, tree) - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = self.get_artifact_fullname(element, key_a) - ref_b = self.get_artifact_fullname(element, key_b) - + # diff(): + # + # Return a list of files that have been added or modified between + # the refs described by ref_a and ref_b. + # + # Args: + # ref_a (str): The first ref + # ref_b (str): The second ref + # subdir (str): A subdirectory to limit the comparison to + # + def diff(self, ref_a, ref_b, *, subdir=None): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) @@ -196,145 +207,103 @@ class CASCache(ArtifactCache): return modified, removed, added - def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs - - for project in self.project_remote_specs: - remote_specs += self.project_remote_specs[project] - - remote_specs = list(utils._deduplicate(remote_specs)) + def initialize_remote(self, remote_spec, q): + try: + remote = CASRemote(remote_spec) + remote.init() - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) + request = buildstream_pb2.StatusRequest() + for attempt in _retry(): + with attempt: + response = remote.ref_storage.Status(request) - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() - - error = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise - - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) + if remote_spec.push and not response.allow_updates: + q.put('CAS server does not allow push') else: - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True + # No error + q.put(None) - remotes[remote_spec.url] = _CASRemote(remote_spec) + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + q.put(e.details()) - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) - project_remotes = [] + # pull(): + # + # Pull a ref from a remote repository. + # + # Args: + # ref (str): The ref to pull + # remote (CASRemote): The remote repository to pull from + # progress (callable): The progress callback, if any + # + # Returns: + # (bool): True if pull was successful, False if ref was not available + # + def pull(self, ref, remote, *, progress=None): + try: + remote.init() - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + for attempt in _retry(): + with attempt: + response = remote.ref_storage.GetReference(request) - remote = remotes[remote_spec.url] - project_remotes.append(remote) + tree = remote_execution_pb2.Digest() + tree.hash = response.digest.hash + tree.size_bytes = response.digest.size_bytes - self._remotes[project] = project_remotes + self._fetch_directory(remote, tree) - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has fetch remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) + self.set_ref(ref, tree) - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - - def pull(self, element, key, *, progress=None): - ref = self.get_artifact_fullname(element, key) - display_key = key[:self.context.log_key_length] - - project = element._get_project() - - for remote in self._remotes[project]: - try: - remote.init() - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - for attempt in _retry(): - with attempt: - response = remote.ref_storage.GetReference(request) - - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes - - self._fetch_directory(remote, tree) - - self.set_ref(ref, tree) - - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact {}: {}".format( - display_key, e)) from e - else: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, display_key - )) - except BlobNotFound as e: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, display_key - )) - - return False - - def link_key(self, element, oldkey, newkey): - oldref = self.get_artifact_fullname(element, oldkey) - newref = self.get_artifact_fullname(element, newkey) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e + else: + return False + # link_ref(): + # + # Add an alias for an existing ref. + # + # Args: + # oldref (str): An existing ref + # newref (str): A new ref for the same directory + # + def link_ref(self, oldref, newref): tree = self.resolve_ref(oldref) self.set_ref(newref, tree) - def _push_refs_to_remote(self, refs, remote): + # push(): + # + # Push committed refs to remote repository. + # + # Args: + # refs (list): The refs to push + # remote (CASRemote): The remote to push to + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (CASError): if there was an error + # + def push(self, refs, remote): skipped_remote = True try: for ref in refs: tree = self.resolve_ref(ref) # Check whether ref is already on the server in which case - # there is no need to push the artifact + # there is no need to push the ref try: request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -364,42 +333,10 @@ class CASCache(ArtifactCache): skipped_remote = False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e return not skipped_remote - def push(self, element, keys): - - refs = [self.get_artifact_fullname(element, key) for key in list(keys)] - - project = element._get_project() - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - - if self._push_refs_to_remote(refs, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - pushed = True - else: - self.context.message(Message( - None, - MessageType.INFO, - "Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key()) - )) - - return pushed - - ################################################ - # API Private Methods # - ################################################ - # objpath(): # # Return the path of an object based on its digest. @@ -470,7 +407,7 @@ class CASCache(ArtifactCache): pass except OSError as e: - raise ArtifactError("Failed to hash object: {}".format(e)) from e + raise CASError("Failed to hash object: {}".format(e)) from e return digest @@ -511,25 +448,39 @@ class CASCache(ArtifactCache): return digest except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e - def update_atime(self, ref): + # update_mtime() + # + # Update the mtime of a ref. + # + # Args: + # ref (str): The ref to update + # + def update_mtime(self, ref): try: os.utime(self._refpath(ref)) except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e + # calculate_cache_size() + # + # Return the real disk usage of the CAS cache. + # + # Returns: + # (int): The size of the cache. + # def calculate_cache_size(self): return utils._get_dir_size(self.casdir) - # list_artifacts(): + # list_refs(): # - # List cached artifacts in Least Recently Modified (LRM) order. + # List refs in Least Recently Modified (LRM) order. # # Returns: # (list) - A list of refs in LRM order # - def list_artifacts(self): + def list_refs(self): # string of: /path/to/repo/refs/heads ref_heads = os.path.join(self.casdir, 'refs', 'heads') @@ -544,7 +495,7 @@ class CASCache(ArtifactCache): mtimes.append(os.path.getmtime(ref_path)) # NOTE: Sorted will sort from earliest to latest, thus the - # first element of this list will be the file modified earliest. + # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] # list_objects(): @@ -598,28 +549,10 @@ class CASCache(ArtifactCache): # def remove(self, ref, *, defer_prune=False): - # Remove extract if not used by other ref - tree = self.resolve_ref(ref) - ref_name, ref_hash = os.path.split(ref) - extract = os.path.join(self.extractdir, ref_name, tree.hash) - keys_file = os.path.join(extract, 'meta', 'keys.yaml') - if os.path.exists(keys_file): - keys_meta = _yaml.load(keys_file) - keys = [keys_meta['strong'], keys_meta['weak']] - remove_extract = True - for other_hash in keys: - if other_hash == ref_hash: - continue - remove_extract = False - break - - if remove_extract: - utils._force_rmtree(extract) - # Remove cache ref refpath = self._refpath(ref) if not os.path.exists(refpath): - raise ArtifactError("Could not find artifact for ref '{}'".format(ref)) + raise CASError("Could not find ref '{}'".format(ref)) os.unlink(refpath) @@ -730,7 +663,7 @@ class CASCache(ArtifactCache): symlinknode.name = name symlinknode.target = os.readlink(full_path) else: - raise ArtifactError("Unsupported file type for {}".format(full_path)) + raise CASError("Unsupported file type for {}".format(full_path)) return self.add_object(digest=dir_digest, buffer=directory.SerializeToString()) @@ -749,7 +682,7 @@ class CASCache(ArtifactCache): if dirnode.name == name: return dirnode.digest - raise ArtifactError("Subdirectory {} not found".format(name)) + raise CASError("Subdirectory {} not found".format(name)) def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): dir_a = remote_execution_pb2.Directory() @@ -826,31 +759,6 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime) - def _initialize_remote(self, remote_spec, q): - try: - remote = _CASRemote(remote_spec) - remote.init() - - request = buildstream_pb2.StatusRequest() - for attempt in _retry(): - with attempt: - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('Artifact server does not allow push') - else: - # No error - q.put(None) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - q.put(e.details()) - - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put(str(e)) - def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() @@ -1090,7 +998,7 @@ class CASCache(ArtifactCache): # Represents a single remote CAS cache. # -class _CASRemote(): +class CASRemote(): def __init__(self, spec): self.spec = spec self._initialized = False @@ -1131,7 +1039,7 @@ class _CASRemote(): certificate_chain=client_cert_bytes) self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) else: - raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) + raise CASError("Unsupported URL: {}".format(self.spec.url)) self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) @@ -1220,10 +1128,10 @@ class _CASBatchRead(): raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.status.code != grpc.StatusCode.OK.value[0]: - raise ArtifactError("Failed to download blob {}: {}".format( + raise CASError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.digest.size_bytes != len(response.data): - raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) yield (response.digest, response.data) @@ -1267,7 +1175,7 @@ class _CASBatchUpdate(): for response in batch_response.responses: if response.status.code != grpc.StatusCode.OK.value[0]: - raise ArtifactError("Failed to upload blob {}: {}".format( + raise CASError("Failed to upload blob {}: {}".format( response.digest.hash, response.status.code)) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 4a9d5191a..1fdab80a8 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -34,8 +34,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._exceptions import ArtifactError -from .._context import Context +from .._exceptions import CASError + +from .cascache import CASCache # The default limit for gRPC messages is 4 MiB. @@ -66,29 +67,25 @@ def message_handler(message, context): def create_server(repo, *, enable_push, max_head_size=int(10e9), min_head_size=int(2e9)): - context = Context() - context.artifactdir = os.path.abspath(repo) - context.set_message_handler(message_handler) - - artifactcache = context.artifactcache + cas = CASCache(os.path.abspath(repo)) # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) - cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size) + cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size) bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) + _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server) + _ReferenceStorageServicer(cas, enable_push=enable_push), server) return server @@ -389,7 +386,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes - except ArtifactError: + except CASError: context.set_code(grpc.StatusCode.NOT_FOUND) return response diff --git a/buildstream/_context.py b/buildstream/_context.py index f36bfd343..8ee45f787 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -30,7 +30,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end from ._artifactcache import ArtifactCache, ArtifactCacheUsage -from ._artifactcache.cascache import CASCache from ._workspaces import Workspaces from .plugin import Plugin @@ -246,7 +245,7 @@ class Context(): @property def artifactcache(self): if not self._artifactcache: - self._artifactcache = CASCache(self) + self._artifactcache = ArtifactCache(self) return self._artifactcache diff --git a/buildstream/_elementfactory.py b/buildstream/_elementfactory.py index 00847e66f..d6591bf4c 100644 --- a/buildstream/_elementfactory.py +++ b/buildstream/_elementfactory.py @@ -47,7 +47,6 @@ class ElementFactory(PluginContext): # Args: # context (object): The Context object for processing # project (object): The project object - # artifacts (ArtifactCache): The artifact cache # meta (object): The loaded MetaElement # # Returns: A newly created Element object of the appropriate kind @@ -56,9 +55,9 @@ class ElementFactory(PluginContext): # PluginError (if the kind lookup failed) # LoadError (if the element itself took issue with the config) # - def create(self, context, project, artifacts, meta): + def create(self, context, project, meta): element_type, default_config = self.lookup(meta.kind) - element = element_type(context, project, artifacts, meta, default_config) + element = element_type(context, project, meta, default_config) version = self._format_versions.get(meta.kind, 0) self._assert_plugin_format(element, version) return element diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index aeacf8dcd..44d890ead 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -89,6 +89,7 @@ class ErrorDomain(Enum): ELEMENT = 11 APP = 12 STREAM = 13 + CAS = 15 # BstError is an internal base exception class for BuildSream @@ -275,6 +276,15 @@ class ArtifactError(BstError): super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True) +# CASError +# +# Raised when errors are encountered in the CAS +# +class CASError(BstError): + def __init__(self, message, *, detail=None, reason=None, temporary=False): + super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True) + + # PipelineError # # Raised from pipeline operations diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index 24f2b595b..df272dfcc 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -533,7 +533,7 @@ class Loader(): raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Expected junction but element kind is {}".format(filename, meta_element.kind)) - element = Element._new_from_meta(meta_element, self._context.artifactcache) + element = Element._new_from_meta(meta_element) element._preflight() element._update_state() diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 17264ae23..acc74295a 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -106,7 +106,7 @@ class Pipeline(): profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) - elements = self._project.load_elements(targets, self._artifacts, + elements = self._project.load_elements(targets, rewritable=rewritable, fetch_subprojects=fetch_subprojects) diff --git a/buildstream/_project.py b/buildstream/_project.py index 0f327c66d..e0265735d 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -221,18 +221,17 @@ class Project(): # Instantiate and return an element # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The loaded MetaElement # first_pass (bool): Whether to use first pass configuration (for junctions) # # Returns: # (Element): A newly created Element object of the appropriate kind # - def create_element(self, artifacts, meta, *, first_pass=False): + def create_element(self, meta, *, first_pass=False): if first_pass: - return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta) + return self.first_pass_config.element_factory.create(self._context, self, meta) else: - return self.config.element_factory.create(self._context, self, artifacts, meta) + return self.config.element_factory.create(self._context, self, meta) # create_source() # @@ -302,7 +301,6 @@ class Project(): # # Args: # targets (list): Target names - # artifacts (ArtifactCache): Artifact cache # rewritable (bool): Whether the loaded files should be rewritable # this is a bit more expensive due to deep copies # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the @@ -311,7 +309,7 @@ class Project(): # Returns: # (list): A list of loaded Element # - def load_elements(self, targets, artifacts, *, + def load_elements(self, targets, *, rewritable=False, fetch_subprojects=False): with self._context.timed_activity("Loading elements", silent_nested=True): meta_elements = self.loader.load(targets, rewritable=rewritable, @@ -320,7 +318,7 @@ class Project(): with self._context.timed_activity("Resolving elements"): elements = [ - Element._new_from_meta(meta, artifacts) + Element._new_from_meta(meta) for meta in meta_elements ] diff --git a/buildstream/element.py b/buildstream/element.py index 4942f5560..af0c1a27c 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -179,7 +179,7 @@ class Element(Plugin): *Since: 1.2* """ - def __init__(self, context, project, artifacts, meta, plugin_conf): + def __init__(self, context, project, meta, plugin_conf): self.__cache_key_dict = None # Dict for cache key calculation self.__cache_key = None # Our cached cache key @@ -207,7 +207,7 @@ class Element(Plugin): self.__sources = [] # List of Sources self.__weak_cache_key = None # Our cached weak cache key self.__strict_cache_key = None # Our cached cache key for strict builds - self.__artifacts = artifacts # Artifact cache + self.__artifacts = context.artifactcache # Artifact cache self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state self.__cached = None # Whether we have a cached artifact self.__strong_cached = None # Whether we have a cached artifact @@ -898,14 +898,13 @@ class Element(Plugin): # and it's dependencies from a meta element. # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The meta element # # Returns: # (Element): A newly created Element instance # @classmethod - def _new_from_meta(cls, meta, artifacts): + def _new_from_meta(cls, meta): if not meta.first_pass: meta.project.ensure_fully_loaded() @@ -913,7 +912,7 @@ class Element(Plugin): if meta in cls.__instantiated_elements: return cls.__instantiated_elements[meta] - element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass) + element = meta.project.create_element(meta, first_pass=meta.first_pass) cls.__instantiated_elements[meta] = element # Instantiate sources @@ -930,12 +929,12 @@ class Element(Plugin): # Instantiate dependencies for meta_dep in meta.dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__runtime_dependencies.append(dependency) dependency.__reverse_dependencies.add(element) for meta_dep in meta.build_dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__build_dependencies.append(dependency) dependency.__reverse_dependencies.add(element) @@ -1995,7 +1994,7 @@ class Element(Plugin): 'sources': [s._get_unique_key(workspace is None) for s in self.__sources], 'workspace': '' if workspace is None else workspace.get_key(self._get_project()), 'public': self.__public, - 'cache': type(self.__artifacts).__name__ + 'cache': 'CASCache' } self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings) diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index c7987e02c..03186862a 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -12,8 +12,8 @@ from multiprocessing import Process, Queue from buildstream import _yaml from buildstream._artifactcache.casserver import create_server -from buildstream._context import Context -from buildstream._exceptions import ArtifactError +from buildstream._artifactcache.cascache import CASCache +from buildstream._exceptions import CASError # ArtifactShare() @@ -48,11 +48,7 @@ class ArtifactShare(): os.makedirs(self.repodir) - context = Context() - context.artifactdir = self.repodir - context.set_message_handler(self._message_handler) - - self.cas = context.artifactcache + self.cas = CASCache(self.repodir) self.total_space = total_space self.free_space = free_space @@ -144,7 +140,7 @@ class ArtifactShare(): if not os.path.exists(object_name): return False return True - except ArtifactError: + except CASError: return False # close(): |