From bfb639bfc1a768cb7e8208d3650d3461e6da7ec4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 3 Nov 2018 11:01:24 +0100 Subject: element.py: Remove artifacts parameter from constructor Get the artifact cache from the context instead. --- buildstream/_elementfactory.py | 2 +- buildstream/element.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buildstream/_elementfactory.py b/buildstream/_elementfactory.py index 00847e66f..282648a6e 100644 --- a/buildstream/_elementfactory.py +++ b/buildstream/_elementfactory.py @@ -58,7 +58,7 @@ class ElementFactory(PluginContext): # def create(self, context, project, artifacts, meta): element_type, default_config = self.lookup(meta.kind) - element = element_type(context, project, artifacts, meta, default_config) + element = element_type(context, project, meta, default_config) version = self._format_versions.get(meta.kind, 0) self._assert_plugin_format(element, version) return element diff --git a/buildstream/element.py b/buildstream/element.py index 5de0253c5..3e4aea6b4 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -174,7 +174,7 @@ class Element(Plugin): *Since: 1.4* """ - def __init__(self, context, project, artifacts, meta, plugin_conf): + def __init__(self, context, project, meta, plugin_conf): self.__cache_key_dict = None # Dict for cache key calculation self.__cache_key = None # Our cached cache key @@ -199,7 +199,7 @@ class Element(Plugin): self.__sources = [] # List of Sources self.__weak_cache_key = None # Our cached weak cache key self.__strict_cache_key = None # Our cached cache key for strict builds - self.__artifacts = artifacts # Artifact cache + self.__artifacts = context.artifactcache # Artifact cache self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state self.__strong_cached = None # Whether we have a cached artifact self.__weak_cached = None # Whether we have a cached artifact -- cgit v1.2.1 From ca855f91661b0deafba4b5605de167f46923ced0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 3 Nov 2018 11:02:49 +0100 Subject: _elementfactory.py: Remove unused artifacts parameter from create() --- buildstream/_elementfactory.py | 3 +-- buildstream/_project.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/buildstream/_elementfactory.py b/buildstream/_elementfactory.py index 282648a6e..d6591bf4c 100644 --- a/buildstream/_elementfactory.py +++ b/buildstream/_elementfactory.py @@ -47,7 +47,6 @@ class ElementFactory(PluginContext): # Args: # context (object): The Context object for processing # project (object): The project object - # artifacts (ArtifactCache): The artifact cache # meta (object): The loaded MetaElement # # Returns: A newly created Element object of the appropriate kind @@ -56,7 +55,7 @@ class ElementFactory(PluginContext): # PluginError (if the kind lookup failed) # LoadError (if the element itself took issue with the config) # - def create(self, context, project, artifacts, meta): + def create(self, context, project, meta): element_type, default_config = self.lookup(meta.kind) element = element_type(context, project, meta, default_config) version = self._format_versions.get(meta.kind, 0) diff --git a/buildstream/_project.py b/buildstream/_project.py index 83aa1f47e..d275d14e3 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -233,9 +233,9 @@ class Project(): # def create_element(self, artifacts, meta, *, first_pass=False): if first_pass: - return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta) + return self.first_pass_config.element_factory.create(self._context, self, meta) else: - return self.config.element_factory.create(self._context, self, artifacts, meta) + return self.config.element_factory.create(self._context, self, meta) # create_source() # -- cgit v1.2.1 From 15fed21c9a1c3150dd0911e59597e284cd3526e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 2 Nov 2018 08:37:33 +0100 Subject: _project.py: Remove unused artifacts parameter from create_element() --- buildstream/_project.py | 3 +-- buildstream/element.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/buildstream/_project.py b/buildstream/_project.py index d275d14e3..d62077de0 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -224,14 +224,13 @@ class Project(): # Instantiate and return an element # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The loaded MetaElement # first_pass (bool): Whether to use first pass configuration (for junctions) # # Returns: # (Element): A newly created Element object of the appropriate kind # - def create_element(self, artifacts, meta, *, first_pass=False): + def create_element(self, meta, *, first_pass=False): if first_pass: return self.first_pass_config.element_factory.create(self._context, self, meta) else: diff --git a/buildstream/element.py b/buildstream/element.py index 3e4aea6b4..e60daf529 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -887,7 +887,7 @@ class Element(Plugin): if meta in cls.__instantiated_elements: return cls.__instantiated_elements[meta] - element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass) + element = meta.project.create_element(meta, first_pass=meta.first_pass) cls.__instantiated_elements[meta] = element # Instantiate sources -- cgit v1.2.1 From 0085d2aae8c669a597fc4f1a74eff38340d107ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 2 Nov 2018 08:40:10 +0100 Subject: element.py: Remove unused artifacts parameter from _new_from_meta() --- buildstream/_loader/loader.py | 2 +- buildstream/_project.py | 2 +- buildstream/element.py | 7 +++---- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index 8a81a71c1..22600b9e9 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -537,7 +537,7 @@ class Loader(): raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Expected junction but element kind is {}".format(filename, meta_element.kind)) - element = Element._new_from_meta(meta_element, self._context.artifactcache) + element = Element._new_from_meta(meta_element) element._preflight() sources = list(element.sources()) diff --git a/buildstream/_project.py b/buildstream/_project.py index d62077de0..c43510b71 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -322,7 +322,7 @@ class Project(): with self._context.timed_activity("Resolving elements"): elements = [ - Element._new_from_meta(meta, artifacts) + Element._new_from_meta(meta) for meta in meta_elements ] diff --git a/buildstream/element.py b/buildstream/element.py index e60daf529..05da2fb4f 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -872,14 +872,13 @@ class Element(Plugin): # and its dependencies from a meta element. # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The meta element # # Returns: # (Element): A newly created Element instance # @classmethod - def _new_from_meta(cls, meta, artifacts): + def _new_from_meta(cls, meta): if not meta.first_pass: meta.project.ensure_fully_loaded() @@ -904,10 +903,10 @@ class Element(Plugin): # Instantiate dependencies for meta_dep in meta.dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__runtime_dependencies.append(dependency) for meta_dep in meta.build_dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__build_dependencies.append(dependency) return element -- cgit v1.2.1 From f69b1117c22e4808e64d5dd9891d45cb6e6932b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 2 Nov 2018 08:41:52 +0100 Subject: _project.py: Remove unused artifacts parameter from load_elements() --- buildstream/_pipeline.py | 2 +- buildstream/_project.py | 3 +-- tests/artifactcache/pull.py | 6 +++--- tests/artifactcache/push.py | 6 +++--- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 1f75b2e9e..3baeaa107 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -106,7 +106,7 @@ class Pipeline(): profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) - elements = self._project.load_elements(targets, self._artifacts, + elements = self._project.load_elements(targets, rewritable=rewritable, fetch_subprojects=fetch_subprojects) diff --git a/buildstream/_project.py b/buildstream/_project.py index c43510b71..5280cf667 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -304,7 +304,6 @@ class Project(): # # Args: # targets (list): Target names - # artifacts (ArtifactCache): Artifact cache # rewritable (bool): Whether the loaded files should be rewritable # this is a bit more expensive due to deep copies # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the @@ -313,7 +312,7 @@ class Project(): # Returns: # (list): A list of loaded Element # - def load_elements(self, targets, artifacts, *, + def load_elements(self, targets, *, rewritable=False, fetch_subprojects=False): with self._context.timed_activity("Loading elements", silent_nested=True): meta_elements = self.loader.load(targets, rewritable=rewritable, diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index e76dc5ca7..3e936fc70 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -90,7 +90,7 @@ def test_pull(cli, tmpdir, datafiles): cas = context.artifactcache # Assert that the element's artifact is **not** cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') assert not cas.contains(element, element_key) @@ -132,7 +132,7 @@ def _test_pull(user_config_file, project_dir, artifact_dir, cas = context.artifactcache # Load the target element - element = project.load_elements([element_name], cas)[0] + element = project.load_elements([element_name])[0] # Manually setup the CAS remote cas.setup_remotes(use_config=True) @@ -193,7 +193,7 @@ def test_pull_tree(cli, tmpdir, datafiles): cas = context.artifactcache # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') assert cas.contains(element, element_key) diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index c95aac3ef..ca8cbd241 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -69,7 +69,7 @@ def test_push(cli, tmpdir, datafiles): cas = context.artifactcache # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') assert cas.contains(element, element_key) @@ -111,7 +111,7 @@ def _test_push(user_config_file, project_dir, artifact_dir, cas = context.artifactcache # Load the target element - element = project.load_elements([element_name], cas)[0] + element = project.load_elements([element_name])[0] # Manually setup the CAS remote cas.setup_remotes(use_config=True) @@ -168,7 +168,7 @@ def test_push_directory(cli, tmpdir, datafiles): cas = context.artifactcache # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') assert cas.contains(element, element_key) -- cgit v1.2.1 From e398f877ab2cb41e7fa531fd71ad9b503fbbedac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 3 Nov 2018 12:20:14 +0100 Subject: element.py: Do not include type name of artifact cache in cache key The artifact cache backend does not affect build outputs and we anyway no longer have pluggable artifact cache backends. This hardcodes CASCache instead of removing the entry completely to avoid cache key changes. --- buildstream/element.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildstream/element.py b/buildstream/element.py index 05da2fb4f..8ec06090c 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -2056,7 +2056,7 @@ class Element(Plugin): 'sources': [s._get_unique_key(workspace is None) for s in self.__sources], 'workspace': '' if workspace is None else workspace.get_key(self._get_project()), 'public': self.__public, - 'cache': type(self.__artifacts).__name__ + 'cache': 'CASCache' } self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings) -- cgit v1.2.1 From 626d20aefb52d25d987c61f377cc1ce3172da8c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 17 Oct 2018 11:02:49 +0100 Subject: Split up artifact cache and CAS cache This changes CASCache from a subclass to a delegate object of ArtifactCache. As the lower layer, CASCache no longer deals with elements or projects. Fixes #659. --- buildstream/_artifactcache/artifactcache.py | 346 ++++++++++++++---- buildstream/_artifactcache/cascache.py | 537 ++++++++++++---------------- buildstream/_artifactcache/casserver.py | 20 +- buildstream/_context.py | 3 +- buildstream/_exceptions.py | 10 + buildstream/storage/_casbaseddirectory.py | 2 +- tests/artifactcache/pull.py | 16 +- tests/artifactcache/push.py | 13 +- tests/testutils/artifactshare.py | 6 +- 9 files changed, 550 insertions(+), 403 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 8ea6c9dc2..06a2b84e0 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -17,17 +17,22 @@ # Authors: # Tristan Maat +import multiprocessing import os +import signal import string from collections import namedtuple from collections.abc import Mapping from ..types import _KeyStrength -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason from .._message import Message, MessageType +from .. import _signals from .. import utils from .. import _yaml +from .cascache import CASCache, CASRemote + CACHE_SIZE_FILE = "cache_size" @@ -93,7 +98,8 @@ class ArtifactCache(): def __init__(self, context): self.context = context self.extractdir = os.path.join(context.artifactdir, 'extract') - self.tmpdir = os.path.join(context.artifactdir, 'tmp') + + self.cas = CASCache(context.artifactdir) self.global_remote_specs = [] self.project_remote_specs = {} @@ -104,12 +110,15 @@ class ArtifactCache(): self._cache_lower_threshold = None # The target cache size for a cleanup self._remotes_setup = False # Check to prevent double-setup of remotes + # Per-project list of _CASRemote instances. + self._remotes = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + os.makedirs(self.extractdir, exist_ok=True) - os.makedirs(self.tmpdir, exist_ok=True) - ################################################ - # Methods implemented on the abstract class # - ################################################ + self._calculate_cache_quota() # get_artifact_fullname() # @@ -240,8 +249,10 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_mtime(element, key) - except ArtifactError: + ref = self.get_artifact_fullname(element, key) + + self.cas.update_mtime(ref) + except CASError: pass # clean(): @@ -252,7 +263,7 @@ class ArtifactCache(): # (int): The size of the cache after having cleaned up # def clean(self): - artifacts = self.list_artifacts() # pylint: disable=assignment-from-no-return + artifacts = self.list_artifacts() # Build a set of the cache keys which are required # based on the required elements at cleanup time @@ -294,7 +305,7 @@ class ArtifactCache(): if key not in required_artifacts: # Remove the actual artifact, if it's not required. - size = self.remove(to_remove) # pylint: disable=assignment-from-no-return + size = self.remove(to_remove) # Remove the size from the removed size self.set_cache_size(self._cache_size - size) @@ -311,7 +322,7 @@ class ArtifactCache(): # (int): The size of the artifact cache. # def compute_cache_size(self): - self._cache_size = self.calculate_cache_size() # pylint: disable=assignment-from-no-return + self._cache_size = self.cas.calculate_cache_size() return self._cache_size @@ -380,28 +391,12 @@ class ArtifactCache(): def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota - ################################################ - # Abstract methods for subclasses to implement # - ################################################ - # preflight(): # # Preflight check. # def preflight(self): - pass - - # update_mtime() - # - # Update the mtime of an artifact. - # - # Args: - # element (Element): The Element to update - # key (str): The key of the artifact. - # - def update_mtime(self, element, key): - raise ImplError("Cache '{kind}' does not implement update_mtime()" - .format(kind=type(self).__name__)) + self.cas.preflight() # initialize_remotes(): # @@ -411,7 +406,59 @@ class ArtifactCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - pass + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise ArtifactError(error) + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes # contains(): # @@ -425,8 +472,9 @@ class ArtifactCache(): # Returns: True if the artifact is in the cache, False otherwise # def contains(self, element, key): - raise ImplError("Cache '{kind}' does not implement contains()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + return self.cas.contains(ref) # list_artifacts(): # @@ -437,8 +485,7 @@ class ArtifactCache(): # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): - raise ImplError("Cache '{kind}' does not implement list_artifacts()" - .format(kind=type(self).__name__)) + return self.cas.list_refs() # remove(): # @@ -450,9 +497,31 @@ class ArtifactCache(): # generated by # `ArtifactCache.get_artifact_fullname`) # - def remove(self, artifact_name): - raise ImplError("Cache '{kind}' does not implement remove()" - .format(kind=type(self).__name__)) + # Returns: + # (int|None) The amount of space pruned from the repository in + # Bytes, or None if defer_prune is True + # + def remove(self, ref): + + # Remove extract if not used by other ref + tree = self.cas.resolve_ref(ref) + ref_name, ref_hash = os.path.split(ref) + extract = os.path.join(self.extractdir, ref_name, tree.hash) + keys_file = os.path.join(extract, 'meta', 'keys.yaml') + if os.path.exists(keys_file): + keys_meta = _yaml.load(keys_file) + keys = [keys_meta['strong'], keys_meta['weak']] + remove_extract = True + for other_hash in keys: + if other_hash == ref_hash: + continue + remove_extract = False + break + + if remove_extract: + utils._force_rmtree(extract) + + return self.cas.remove(ref) # extract(): # @@ -472,8 +541,11 @@ class ArtifactCache(): # Returns: path to extracted artifact # def extract(self, element, key): - raise ImplError("Cache '{kind}' does not implement extract()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + path = os.path.join(self.extractdir, element._get_project().name, element.normal_name) + + return self.cas.extract(ref, path) # commit(): # @@ -485,8 +557,9 @@ class ArtifactCache(): # keys (list): The cache keys to use # def commit(self, element, content, keys): - raise ImplError("Cache '{kind}' does not implement commit()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in keys] + + self.cas.commit(refs, content) # diff(): # @@ -500,8 +573,10 @@ class ArtifactCache(): # subdir (str): A subdirectory to limit the comparison to # def diff(self, element, key_a, key_b, *, subdir=None): - raise ImplError("Cache '{kind}' does not implement diff()" - .format(kind=type(self).__name__)) + ref_a = self.get_artifact_fullname(element, key_a) + ref_b = self.get_artifact_fullname(element, key_b) + + return self.cas.diff(ref_a, ref_b, subdir=subdir) # has_fetch_remotes(): # @@ -513,7 +588,16 @@ class ArtifactCache(): # Returns: True if any remote repositories are configured, False otherwise # def has_fetch_remotes(self, *, element=None): - return False + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif element is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[element._get_project()] + return bool(remotes_for_project) # has_push_remotes(): # @@ -525,7 +609,16 @@ class ArtifactCache(): # Returns: True if any remote repository is configured, False otherwise # def has_push_remotes(self, *, element=None): - return False + if not self._has_push_remotes: + # No project has push remotes + return False + elif element is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[element._get_project()] + return any(remote.spec.push for remote in remotes_for_project) # push(): # @@ -542,8 +635,28 @@ class ArtifactCache(): # (ArtifactError): if there was an error # def push(self, element, keys): - raise ImplError("Cache '{kind}' does not implement push()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] + + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + + if self.cas.push(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + return pushed # pull(): # @@ -558,8 +671,130 @@ class ArtifactCache(): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, progress=None): - raise ImplError("Cache '{kind}' does not implement pull()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + project = element._get_project() + + for remote in self._remotes[project]: + try: + display_key = element._get_brief_display_key() + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) + + if self.cas.pull(ref, remote, progress=progress): + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + except CASError as e: + raise ArtifactError("Failed to pull artifact {}: {}".format( + element._get_brief_display_key(), e)) from e + + return False + + # pull_tree(): + # + # Pull a single Tree rather than an artifact. + # Does not update local refs. + # + # Args: + # project (Project): The current project + # digest (Digest): The digest of the tree + # + def pull_tree(self, project, digest): + for remote in self._remotes[project]: + digest = self.cas.pull_tree(remote, digest) + + if digest: + # no need to pull from additional remotes + return digest + + return None + + # push_directory(): + # + # Push the given virtual directory to all remotes. + # + # Args: + # project (Project): The current project + # directory (Directory): A virtual directory object to push. + # + # Raises: + # (ArtifactError): if there was an error + # + def push_directory(self, project, directory): + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("push_directory was called, but no remote artifact " + + "servers are configured as push remotes.") + + if directory.ref is None: + return + + for remote in push_remotes: + self.cas.push_directory(remote, directory) + + # push_message(): + # + # Push the given protobuf message to all remotes. + # + # Args: + # project (Project): The current project + # message (Message): A protobuf message to push. + # + # Raises: + # (ArtifactError): if there was an error + # + def push_message(self, project, message): + + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("push_message was called, but no remote artifact " + + "servers are configured as push remotes.") + + for remote in push_remotes: + message_digest = self.cas.push_message(remote, message) + + return message_digest + + # verify_digest_pushed(): + # + # Check whether the object is already on the server in which case + # there is no need to upload it. + # + # Args: + # project (Project): The current project + # digest (Digest): The object digest. + # + def verify_digest_pushed(self, project, digest): + + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("verify_digest_pushed was called, but no remote artifact " + + "servers are configured as push remotes.") + + pushed = False + + for remote in push_remotes: + if self.cas.verify_digest_on_remote(remote, digest): + pushed = True + + return pushed # link_key(): # @@ -571,19 +806,10 @@ class ArtifactCache(): # newkey (str): A new cache key for the artifact # def link_key(self, element, oldkey, newkey): - raise ImplError("Cache '{kind}' does not implement link_key()" - .format(kind=type(self).__name__)) + oldref = self.get_artifact_fullname(element, oldkey) + newref = self.get_artifact_fullname(element, newkey) - # calculate_cache_size() - # - # Return the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def calculate_cache_size(self): - raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" - .format(kind=type(self).__name__)) + self.cas.link_ref(oldref, newref) ################################################ # Local Private Methods # diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index b99f4d7b8..b6e26ec8b 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -20,9 +20,7 @@ import hashlib import itertools import io -import multiprocessing import os -import signal import stat import tempfile import uuid @@ -31,17 +29,13 @@ from urllib.parse import urlparse import grpc -from .. import _yaml - from .._protos.google.rpc import code_pb2 from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .. import _signals, utils -from .._exceptions import ArtifactError - -from . import ArtifactCache +from .. import utils +from .._exceptions import CASError # The default limit for gRPC messages is 4 MiB. @@ -49,62 +43,68 @@ from . import ArtifactCache _MAX_PAYLOAD_BYTES = 1024 * 1024 -# A CASCache manages artifacts in a CAS repository as specified in the -# Remote Execution API. +# A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: -# context (Context): The BuildStream context -# -# Pushing is explicitly disabled by the platform in some cases, -# like when we are falling back to functioning without using -# user namespaces. +# path (str): The root directory for the CAS repository # -class CASCache(ArtifactCache): +class CASCache(): - def __init__(self, context): - super().__init__(context) - - self.casdir = os.path.join(context.artifactdir, 'cas') + def __init__(self, path): + self.casdir = os.path.join(path, 'cas') + self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) - self._calculate_cache_quota() - - # Per-project list of _CASRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - ################################################ - # Implementation of abstract methods # - ################################################ - + # preflight(): + # + # Preflight check. + # def preflight(self): headdir = os.path.join(self.casdir, 'refs', 'heads') objdir = os.path.join(self.casdir, 'objects') if not (os.path.isdir(headdir) and os.path.isdir(objdir)): - raise ArtifactError("CAS repository check failed for '{}'" - .format(self.casdir)) + raise CASError("CAS repository check failed for '{}'".format(self.casdir)) - def contains(self, element, key): - refpath = self._refpath(self.get_artifact_fullname(element, key)) + # contains(): + # + # Check whether the specified ref is already available in the local CAS cache. + # + # Args: + # ref (str): The ref to check + # + # Returns: True if the ref is in the cache, False otherwise + # + def contains(self, ref): + refpath = self._refpath(ref) # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) - def extract(self, element, key): - ref = self.get_artifact_fullname(element, key) - + # extract(): + # + # Extract cached directory for the specified ref if it hasn't + # already been extracted. + # + # Args: + # ref (str): The ref whose directory to extract + # path (str): The destination path + # + # Raises: + # CASError: In cases there was an OSError, or if the ref did not exist. + # + # Returns: path to extracted directory + # + def extract(self, ref, path): tree = self.resolve_ref(ref, update_mtime=True) - dest = os.path.join(self.extractdir, element._get_project().name, - element.normal_name, tree.hash) + dest = os.path.join(path, tree.hash) if os.path.isdir(dest): - # artifact has already been extracted + # directory has already been extracted return dest - with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: + with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir: checkoutdir = os.path.join(tmpdir, ref) self._checkout(checkoutdir, tree) @@ -118,23 +118,35 @@ class CASCache(ArtifactCache): # If rename fails with these errors, another process beat # us to it so just ignore. if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]: - raise ArtifactError("Failed to extract artifact for ref '{}': {}" - .format(ref, e)) from e + raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e return dest - def commit(self, element, content, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] - - tree = self._commit_directory(content) + # commit(): + # + # Commit directory to cache. + # + # Args: + # refs (list): The refs to set + # path (str): The directory to import + # + def commit(self, refs, path): + tree = self._commit_directory(path) for ref in refs: self.set_ref(ref, tree) - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = self.get_artifact_fullname(element, key_a) - ref_b = self.get_artifact_fullname(element, key_b) - + # diff(): + # + # Return a list of files that have been added or modified between + # the refs described by ref_a and ref_b. + # + # Args: + # ref_a (str): The first ref + # ref_b (str): The second ref + # subdir (str): A subdirectory to limit the comparison to + # + def diff(self, ref_a, ref_b, *, subdir=None): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) @@ -150,158 +162,122 @@ class CASCache(ArtifactCache): return modified, removed, added - def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs - - for project in self.project_remote_specs: - remote_specs += self.project_remote_specs[project] - - remote_specs = list(utils._deduplicate(remote_specs)) - - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) + def initialize_remote(self, remote_spec, q): + try: + remote = CASRemote(remote_spec) + remote.init() - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() - - error = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise + request = buildstream_pb2.StatusRequest() + response = remote.ref_storage.Status(request) - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) + if remote_spec.push and not response.allow_updates: + q.put('CAS server does not allow push') else: - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True + # No error + q.put(None) - remotes[remote_spec.url] = _CASRemote(remote_spec) + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + q.put(e.details()) - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) - project_remotes = [] + # pull(): + # + # Pull a ref from a remote repository. + # + # Args: + # ref (str): The ref to pull + # remote (CASRemote): The remote repository to pull from + # progress (callable): The progress callback, if any + # + # Returns: + # (bool): True if pull was successful, False if ref was not available + # + def pull(self, ref, remote, *, progress=None): + try: + remote.init() - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + response = remote.ref_storage.GetReference(request) - remote = remotes[remote_spec.url] - project_remotes.append(remote) + tree = remote_execution_pb2.Digest() + tree.hash = response.digest.hash + tree.size_bytes = response.digest.size_bytes - self._remotes[project] = project_remotes + self._fetch_directory(remote, tree) - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has fetch remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) + self.set_ref(ref, tree) - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - - def pull(self, element, key, *, progress=None): - ref = self.get_artifact_fullname(element, key) - - project = element._get_project() - - for remote in self._remotes[project]: - try: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes - - self._fetch_directory(remote, tree) - - self.set_ref(ref, tree) - - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact {}: {}".format( - element._get_brief_display_key(), e)) from e - else: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, element._get_brief_display_key() - )) - - return False - - def pull_tree(self, project, digest): - """ Pull a single Tree rather than an artifact. - Does not update local refs. """ + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e + else: + return False - for remote in self._remotes[project]: - try: - remote.init() + # pull_tree(): + # + # Pull a single Tree rather than a ref. + # Does not update local refs. + # + # Args: + # remote (CASRemote): The remote to pull from + # digest (Digest): The digest of the tree + # + def pull_tree(self, remote, digest): + try: + remote.init() - digest = self._fetch_tree(remote, digest) + digest = self._fetch_tree(remote, digest) - # no need to pull from additional remotes - return digest + return digest - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise return None - def link_key(self, element, oldkey, newkey): - oldref = self.get_artifact_fullname(element, oldkey) - newref = self.get_artifact_fullname(element, newkey) - + # link_ref(): + # + # Add an alias for an existing ref. + # + # Args: + # oldref (str): An existing ref + # newref (str): A new ref for the same directory + # + def link_ref(self, oldref, newref): tree = self.resolve_ref(oldref) self.set_ref(newref, tree) - def _push_refs_to_remote(self, refs, remote): + # push(): + # + # Push committed refs to remote repository. + # + # Args: + # refs (list): The refs to push + # remote (CASRemote): The remote to push to + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (CASError): if there was an error + # + def push(self, refs, remote): skipped_remote = True try: for ref in refs: tree = self.resolve_ref(ref) # Check whether ref is already on the server in which case - # there is no need to push the artifact + # there is no need to push the ref try: request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -327,65 +303,38 @@ class CASCache(ArtifactCache): skipped_remote = False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e return not skipped_remote - def push(self, element, keys): - - refs = [self.get_artifact_fullname(element, key) for key in list(keys)] - - project = element._get_project() - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - - if self._push_refs_to_remote(refs, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - pushed = True - else: - element.info("Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key() - )) - - return pushed - - def push_directory(self, project, directory): - """ Push the given virtual directory to all remotes. - - Args: - project (Project): The current project - directory (Directory): A virtual directory object to push. - - Raises: ArtifactError if no push remotes are configured. - """ - - if self._has_push_remotes: - push_remotes = [r for r in self._remotes[project] if r.spec.push] - else: - push_remotes = [] - - if not push_remotes: - raise ArtifactError("CASCache: push_directory was called, but no remote artifact " + - "servers are configured as push remotes.") - - if directory.ref is None: - return - - for remote in push_remotes: - remote.init() - - self._send_directory(remote, directory.ref) + # push_directory(): + # + # Push the given virtual directory to a remote. + # + # Args: + # remote (CASRemote): The remote to push to + # directory (Directory): A virtual directory object to push. + # + # Raises: + # (CASError): if there was an error + # + def push_directory(self, remote, directory): + remote.init() - def push_message(self, project, message): + self._send_directory(remote, directory.ref) - push_remotes = [r for r in self._remotes[project] if r.spec.push] + # push_message(): + # + # Push the given protobuf message to a remote. + # + # Args: + # remote (CASRemote): The remote to push to + # message (Message): A protobuf message to push. + # + # Raises: + # (CASError): if there was an error + # + def push_message(self, remote, message): message_buffer = message.SerializeToString() message_sha = hashlib.sha256(message_buffer) @@ -393,17 +342,25 @@ class CASCache(ArtifactCache): message_digest.hash = message_sha.hexdigest() message_digest.size_bytes = len(message_buffer) - for remote in push_remotes: - remote.init() + remote.init() - with io.BytesIO(message_buffer) as b: - self._send_blob(remote, message_digest, b) + with io.BytesIO(message_buffer) as b: + self._send_blob(remote, message_digest, b) return message_digest - def _verify_digest_on_remote(self, remote, digest): - # Check whether ref is already on the server in which case - # there is no need to push the artifact + # verify_digest_on_remote(): + # + # Check whether the object is already on the server in which case + # there is no need to upload it. + # + # Args: + # remote (CASRemote): The remote to check + # digest (Digest): The object digest. + # + def verify_digest_on_remote(self, remote, digest): + remote.init() + request = remote_execution_pb2.FindMissingBlobsRequest() request.blob_digests.extend([digest]) @@ -413,24 +370,6 @@ class CASCache(ArtifactCache): return True - def verify_digest_pushed(self, project, digest): - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - - if self._verify_digest_on_remote(remote, digest): - pushed = True - - return pushed - - ################################################ - # API Private Methods # - ################################################ - # objpath(): # # Return the path of an object based on its digest. @@ -496,7 +435,7 @@ class CASCache(ArtifactCache): pass except OSError as e: - raise ArtifactError("Failed to hash object: {}".format(e)) from e + raise CASError("Failed to hash object: {}".format(e)) from e return digest @@ -537,26 +476,39 @@ class CASCache(ArtifactCache): return digest except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e - def update_mtime(self, element, key): + # update_mtime() + # + # Update the mtime of a ref. + # + # Args: + # ref (str): The ref to update + # + def update_mtime(self, ref): try: - ref = self.get_artifact_fullname(element, key) os.utime(self._refpath(ref)) except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e + # calculate_cache_size() + # + # Return the real disk usage of the CAS cache. + # + # Returns: + # (int): The size of the cache. + # def calculate_cache_size(self): return utils._get_dir_size(self.casdir) - # list_artifacts(): + # list_refs(): # - # List cached artifacts in Least Recently Modified (LRM) order. + # List refs in Least Recently Modified (LRM) order. # # Returns: # (list) - A list of refs in LRM order # - def list_artifacts(self): + def list_refs(self): # string of: /path/to/repo/refs/heads ref_heads = os.path.join(self.casdir, 'refs', 'heads') @@ -571,7 +523,7 @@ class CASCache(ArtifactCache): mtimes.append(os.path.getmtime(ref_path)) # NOTE: Sorted will sort from earliest to latest, thus the - # first element of this list will be the file modified earliest. + # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] # remove(): @@ -590,28 +542,10 @@ class CASCache(ArtifactCache): # def remove(self, ref, *, defer_prune=False): - # Remove extract if not used by other ref - tree = self.resolve_ref(ref) - ref_name, ref_hash = os.path.split(ref) - extract = os.path.join(self.extractdir, ref_name, tree.hash) - keys_file = os.path.join(extract, 'meta', 'keys.yaml') - if os.path.exists(keys_file): - keys_meta = _yaml.load(keys_file) - keys = [keys_meta['strong'], keys_meta['weak']] - remove_extract = True - for other_hash in keys: - if other_hash == ref_hash: - continue - remove_extract = False - break - - if remove_extract: - utils._force_rmtree(extract) - # Remove cache ref refpath = self._refpath(ref) if not os.path.exists(refpath): - raise ArtifactError("Could not find artifact for ref '{}'".format(ref)) + raise CASError("Could not find ref '{}'".format(ref)) os.unlink(refpath) @@ -721,7 +655,7 @@ class CASCache(ArtifactCache): # The process serving the socket can't be cached anyway pass else: - raise ArtifactError("Unsupported file type for {}".format(full_path)) + raise CASError("Unsupported file type for {}".format(full_path)) return self.add_object(digest=dir_digest, buffer=directory.SerializeToString()) @@ -740,7 +674,7 @@ class CASCache(ArtifactCache): if dirnode.name == name: return dirnode.digest - raise ArtifactError("Subdirectory {} not found".format(name)) + raise CASError("Subdirectory {} not found".format(name)) def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): dir_a = remote_execution_pb2.Directory() @@ -812,29 +746,6 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest) - def _initialize_remote(self, remote_spec, q): - try: - remote = _CASRemote(remote_spec) - remote.init() - - request = buildstream_pb2.StatusRequest() - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('Artifact server does not allow push') - else: - # No error - q.put(None) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - q.put(e.details()) - - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put(str(e)) - def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() @@ -1080,7 +991,7 @@ class CASCache(ArtifactCache): # Represents a single remote CAS cache. # -class _CASRemote(): +class CASRemote(): def __init__(self, spec): self.spec = spec self._initialized = False @@ -1125,7 +1036,7 @@ class _CASRemote(): certificate_chain=client_cert_bytes) self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) else: - raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) + raise CASError("Unsupported URL: {}".format(self.spec.url)) self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) @@ -1203,10 +1114,10 @@ class _CASBatchRead(): for response in batch_response.responses: if response.status.code != code_pb2.OK: - raise ArtifactError("Failed to download blob {}: {}".format( + raise CASError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.digest.size_bytes != len(response.data): - raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) yield (response.digest, response.data) @@ -1248,7 +1159,7 @@ class _CASBatchUpdate(): for response in batch_response.responses: if response.status.code != code_pb2.OK: - raise ArtifactError("Failed to upload blob {}: {}".format( + raise CASError("Failed to upload blob {}: {}".format( response.digest.hash, response.status.code)) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 31b05ce0f..ee84c4943 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._exceptions import ArtifactError -from .._context import Context +from .._exceptions import CASError + +from .cascache import CASCache # The default limit for gRPC messages is 4 MiB. @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception): # enable_push (bool): Whether to allow blob uploads and artifact updates # def create_server(repo, *, enable_push): - context = Context() - context.artifactdir = os.path.abspath(repo) - - artifactcache = context.artifactcache + cas = CASCache(os.path.abspath(repo)) # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(artifactcache, enable_push=enable_push), server) + _ByteStreamServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server) + _ReferenceStorageServicer(cas, enable_push=enable_push), server) return server @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes - except ArtifactError: + except CASError: context.set_code(grpc.StatusCode.NOT_FOUND) return response @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size): return 0 # obtain a list of LRP artifacts - LRP_artifacts = cas.list_artifacts() + LRP_artifacts = cas.list_refs() removed_size = 0 # in bytes while object_size - removed_size > free_disk_space: diff --git a/buildstream/_context.py b/buildstream/_context.py index e3c290b7b..876b74712 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end from ._artifactcache import ArtifactCache -from ._artifactcache.cascache import CASCache from ._workspaces import Workspaces from .plugin import _plugin_lookup @@ -233,7 +232,7 @@ class Context(): @property def artifactcache(self): if not self._artifactcache: - self._artifactcache = CASCache(self) + self._artifactcache = ArtifactCache(self) return self._artifactcache diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index a1c26d38c..ba0b9fabb 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -90,6 +90,7 @@ class ErrorDomain(Enum): APP = 12 STREAM = 13 VIRTUAL_FS = 14 + CAS = 15 # BstError is an internal base exception class for BuildSream @@ -274,6 +275,15 @@ class ArtifactError(BstError): super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True) +# CASError +# +# Raised when errors are encountered in the CAS +# +class CASError(BstError): + def __init__(self, message, *, detail=None, reason=None, temporary=False): + super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True) + + # PipelineError # # Raised from pipeline operations diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py index 07fd206ed..fa5ec823b 100644 --- a/buildstream/storage/_casbaseddirectory.py +++ b/buildstream/storage/_casbaseddirectory.py @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory): self.filename = filename self.common_name = common_name self.pb2_directory = remote_execution_pb2.Directory() - self.cas_cache = context.artifactcache + self.cas_cache = context.artifactcache.cas if ref: with open(self.cas_cache.objpath(ref), 'rb') as f: self.pb2_directory.ParseFromString(f.read()) diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 3e936fc70..dde451a8c 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles): # Load the project and CAS cache project = Project(project_dir, context) project.ensure_fully_loaded() - cas = context.artifactcache + artifactcache = context.artifactcache + cas = artifactcache.cas # Assert that the element's artifact is cached element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') - assert cas.contains(element, element_key) + assert artifactcache.contains(element, element_key) # Retrieve the Directory object from the cached artifact - artifact_ref = cas.get_artifact_fullname(element, element_key) + artifact_ref = artifactcache.get_artifact_fullname(element, element_key) artifact_digest = cas.resolve_ref(artifact_ref) queue = multiprocessing.Queue() @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest project.ensure_fully_loaded() # Create a local CAS cache handle - cas = context.artifactcache + artifactcache = context.artifactcache + cas = artifactcache.cas # Manually setup the CAS remote - cas.setup_remotes(use_config=True) + artifactcache.setup_remotes(use_config=True) - if cas.has_push_remotes(): + if artifactcache.has_push_remotes(): directory = remote_execution_pb2.Directory() with open(cas.objpath(artifact_digest), 'rb') as f: @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest tree_maker(cas, tree, directory) # Push the Tree as a regular message - tree_digest = cas.push_message(project, tree) + tree_digest = artifactcache.push_message(project, tree) queue.put((tree_digest.hash, tree_digest.size_bytes)) else: diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index ca8cbd241..be6293cf4 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles): # Load the project and CAS cache project = Project(project_dir, context) project.ensure_fully_loaded() - cas = context.artifactcache + artifactcache = context.artifactcache + cas = artifactcache.cas # Assert that the element's artifact is cached element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') - assert cas.contains(element, element_key) + assert artifactcache.contains(element, element_key) # Manually setup the CAS remote - cas.setup_remotes(use_config=True) - cas.initialize_remotes() - assert cas.has_push_remotes(element=element) + artifactcache.setup_remotes(use_config=True) + artifactcache.initialize_remotes() + assert artifactcache.has_push_remotes(element=element) # Recreate the CasBasedDirectory object from the cached artifact - artifact_ref = cas.get_artifact_fullname(element, element_key) + artifact_ref = artifactcache.get_artifact_fullname(element, element_key) artifact_digest = cas.resolve_ref(artifact_ref) queue = multiprocessing.Queue() diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index a8af59905..02f76de90 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -13,7 +13,7 @@ import pytest_cov from buildstream import _yaml from buildstream._artifactcache.casserver import create_server from buildstream._context import Context -from buildstream._exceptions import ArtifactError +from buildstream._exceptions import CASError from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 @@ -48,7 +48,7 @@ class ArtifactShare(): context = Context() context.artifactdir = self.repodir - self.cas = context.artifactcache + self.cas = context.artifactcache.cas self.total_space = total_space self.free_space = free_space @@ -135,7 +135,7 @@ class ArtifactShare(): try: tree = self.cas.resolve_ref(artifact_key) return True - except ArtifactError: + except CASError: return False # close(): -- cgit v1.2.1