From 228c41bf68122e791a19f7f37d87a31e0cf440d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 17 Oct 2018 17:08:44 +0100 Subject: _artifactcache: Rename update_atime() to update_mtime() os.utime() updates both, however, list_artifacts() sorts refs by mtime, i.e., atime is irrelevant. --- buildstream/_artifactcache/artifactcache.py | 12 ++++++------ buildstream/_artifactcache/cascache.py | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 48fd32218..728819e81 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -256,7 +256,7 @@ class ArtifactCache(): self._required_elements.update(elements) # For the cache keys which were resolved so far, we bump - # the atime of them. + # the mtime of them. # # This is just in case we have concurrent instances of # BuildStream running with the same artifact cache, it will @@ -268,7 +268,7 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_atime(key) + self.update_mtime(key) except ArtifactError: pass @@ -478,15 +478,15 @@ class ArtifactCache(): def preflight(self): pass - # update_atime() + # update_mtime() # - # Update the atime of an artifact. + # Update the mtime of an artifact. # # Args: # key (str): The key of the artifact. # - def update_atime(self, key): - raise ImplError("Cache '{kind}' does not implement contains()" + def update_mtime(self, key): + raise ImplError("Cache '{kind}' does not implement update_mtime()" .format(kind=type(self).__name__)) # initialize_remotes(): diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 4f0d10da5..c1fc90bb6 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -513,7 +513,7 @@ class CASCache(ArtifactCache): except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e - def update_atime(self, ref): + def update_mtime(self, ref): try: os.utime(self._refpath(ref)) except FileNotFoundError as e: -- cgit v1.2.1 From aa5007f1dbd5263aea087f4295f389c1d93fddb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Wed, 17 Oct 2018 17:11:14 +0100 Subject: _artifactcache: Fix ref in update_mtime() get_artifact_fullname() is required to construct the ref. The cache key alone does not suffice. --- buildstream/_artifactcache/artifactcache.py | 5 +++-- buildstream/_artifactcache/cascache.py | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 728819e81..ed5ef8262 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -268,7 +268,7 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_mtime(key) + self.update_mtime(element, key) except ArtifactError: pass @@ -483,9 +483,10 @@ class ArtifactCache(): # Update the mtime of an artifact. # # Args: + # element (Element): The Element to update # key (str): The key of the artifact. # - def update_mtime(self, key): + def update_mtime(self, element, key): raise ImplError("Cache '{kind}' does not implement update_mtime()" .format(kind=type(self).__name__)) diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index c1fc90bb6..a3d27c8d1 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -513,8 +513,9 @@ class CASCache(ArtifactCache): except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e - def update_mtime(self, ref): + def update_mtime(self, element, key): try: + ref = self.get_artifact_fullname(element, key) os.utime(self._refpath(ref)) except FileNotFoundError as e: raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e -- cgit v1.2.1 From 6593a492bd9892cef892ebc4378a5d221660658e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 3 Nov 2018 11:01:24 +0100 Subject: element.py: Remove artifacts parameter from constructor Get the artifact cache from the context instead. --- buildstream/_elementfactory.py | 2 +- buildstream/element.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buildstream/_elementfactory.py b/buildstream/_elementfactory.py index 00847e66f..282648a6e 100644 --- a/buildstream/_elementfactory.py +++ b/buildstream/_elementfactory.py @@ -58,7 +58,7 @@ class ElementFactory(PluginContext): # def create(self, context, project, artifacts, meta): element_type, default_config = self.lookup(meta.kind) - element = element_type(context, project, artifacts, meta, default_config) + element = element_type(context, project, meta, default_config) version = self._format_versions.get(meta.kind, 0) self._assert_plugin_format(element, version) return element diff --git a/buildstream/element.py b/buildstream/element.py index 4942f5560..cfabd615e 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -179,7 +179,7 @@ class Element(Plugin): *Since: 1.2* """ - def __init__(self, context, project, artifacts, meta, plugin_conf): + def __init__(self, context, project, meta, plugin_conf): self.__cache_key_dict = None # Dict for cache key calculation self.__cache_key = None # Our cached cache key @@ -207,7 +207,7 @@ class Element(Plugin): self.__sources = [] # List of Sources self.__weak_cache_key = None # Our cached weak cache key self.__strict_cache_key = None # Our cached cache key for strict builds - self.__artifacts = artifacts # Artifact cache + self.__artifacts = context.artifactcache # Artifact cache self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state self.__cached = None # Whether we have a cached artifact self.__strong_cached = None # Whether we have a cached artifact -- cgit v1.2.1 From d821566c63907637d9b9a842bdcbb61637b64fc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 3 Nov 2018 11:02:49 +0100 Subject: _elementfactory.py: Remove unused artifacts parameter from create() --- buildstream/_elementfactory.py | 3 +-- buildstream/_project.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/buildstream/_elementfactory.py b/buildstream/_elementfactory.py index 282648a6e..d6591bf4c 100644 --- a/buildstream/_elementfactory.py +++ b/buildstream/_elementfactory.py @@ -47,7 +47,6 @@ class ElementFactory(PluginContext): # Args: # context (object): The Context object for processing # project (object): The project object - # artifacts (ArtifactCache): The artifact cache # meta (object): The loaded MetaElement # # Returns: A newly created Element object of the appropriate kind @@ -56,7 +55,7 @@ class ElementFactory(PluginContext): # PluginError (if the kind lookup failed) # LoadError (if the element itself took issue with the config) # - def create(self, context, project, artifacts, meta): + def create(self, context, project, meta): element_type, default_config = self.lookup(meta.kind) element = element_type(context, project, meta, default_config) version = self._format_versions.get(meta.kind, 0) diff --git a/buildstream/_project.py b/buildstream/_project.py index 0f327c66d..ef22f4e6f 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -230,9 +230,9 @@ class Project(): # def create_element(self, artifacts, meta, *, first_pass=False): if first_pass: - return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta) + return self.first_pass_config.element_factory.create(self._context, self, meta) else: - return self.config.element_factory.create(self._context, self, artifacts, meta) + return self.config.element_factory.create(self._context, self, meta) # create_source() # -- cgit v1.2.1 From e9f34c3ce001a6c8581951b562c1ac005381151e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 2 Nov 2018 08:37:33 +0100 Subject: _project.py: Remove unused artifacts parameter from create_element() --- buildstream/_project.py | 3 +-- buildstream/element.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/buildstream/_project.py b/buildstream/_project.py index ef22f4e6f..9a1a2f3dc 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -221,14 +221,13 @@ class Project(): # Instantiate and return an element # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The loaded MetaElement # first_pass (bool): Whether to use first pass configuration (for junctions) # # Returns: # (Element): A newly created Element object of the appropriate kind # - def create_element(self, artifacts, meta, *, first_pass=False): + def create_element(self, meta, *, first_pass=False): if first_pass: return self.first_pass_config.element_factory.create(self._context, self, meta) else: diff --git a/buildstream/element.py b/buildstream/element.py index cfabd615e..78c5095dd 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -913,7 +913,7 @@ class Element(Plugin): if meta in cls.__instantiated_elements: return cls.__instantiated_elements[meta] - element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass) + element = meta.project.create_element(meta, first_pass=meta.first_pass) cls.__instantiated_elements[meta] = element # Instantiate sources -- cgit v1.2.1 From 3998807036696a9c109029f8224eb8f9bd1d7188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 2 Nov 2018 08:40:10 +0100 Subject: element.py: Remove unused artifacts parameter from _new_from_meta() --- buildstream/_loader/loader.py | 2 +- buildstream/_project.py | 2 +- buildstream/element.py | 7 +++---- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index 24f2b595b..df272dfcc 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -533,7 +533,7 @@ class Loader(): raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Expected junction but element kind is {}".format(filename, meta_element.kind)) - element = Element._new_from_meta(meta_element, self._context.artifactcache) + element = Element._new_from_meta(meta_element) element._preflight() element._update_state() diff --git a/buildstream/_project.py b/buildstream/_project.py index 9a1a2f3dc..3dba17131 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -319,7 +319,7 @@ class Project(): with self._context.timed_activity("Resolving elements"): elements = [ - Element._new_from_meta(meta, artifacts) + Element._new_from_meta(meta) for meta in meta_elements ] diff --git a/buildstream/element.py b/buildstream/element.py index 78c5095dd..5c8a8d604 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -898,14 +898,13 @@ class Element(Plugin): # and it's dependencies from a meta element. # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The meta element # # Returns: # (Element): A newly created Element instance # @classmethod - def _new_from_meta(cls, meta, artifacts): + def _new_from_meta(cls, meta): if not meta.first_pass: meta.project.ensure_fully_loaded() @@ -930,12 +929,12 @@ class Element(Plugin): # Instantiate dependencies for meta_dep in meta.dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__runtime_dependencies.append(dependency) dependency.__reverse_dependencies.add(element) for meta_dep in meta.build_dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__build_dependencies.append(dependency) dependency.__reverse_dependencies.add(element) -- cgit v1.2.1 From 9a145fed81eaa4487a6789c442be4ec8cb26f513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Fri, 2 Nov 2018 08:41:52 +0100 Subject: _project.py: Remove unused artifacts parameter from load_elements() --- buildstream/_pipeline.py | 2 +- buildstream/_project.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 17264ae23..acc74295a 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -106,7 +106,7 @@ class Pipeline(): profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) - elements = self._project.load_elements(targets, self._artifacts, + elements = self._project.load_elements(targets, rewritable=rewritable, fetch_subprojects=fetch_subprojects) diff --git a/buildstream/_project.py b/buildstream/_project.py index 3dba17131..e0265735d 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -301,7 +301,6 @@ class Project(): # # Args: # targets (list): Target names - # artifacts (ArtifactCache): Artifact cache # rewritable (bool): Whether the loaded files should be rewritable # this is a bit more expensive due to deep copies # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the @@ -310,7 +309,7 @@ class Project(): # Returns: # (list): A list of loaded Element # - def load_elements(self, targets, artifacts, *, + def load_elements(self, targets, *, rewritable=False, fetch_subprojects=False): with self._context.timed_activity("Loading elements", silent_nested=True): meta_elements = self.loader.load(targets, rewritable=rewritable, -- cgit v1.2.1 From 6706d3c64c9cef1f7ee43978aa84f484c55cea1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Sat, 3 Nov 2018 12:20:14 +0100 Subject: element.py: Do not include type name of artifact cache in cache key The artifact cache backend does not affect build outputs and we anyway no longer have pluggable artifact cache backends. This hardcodes CASCache instead of removing the entry completely to avoid cache key changes. --- buildstream/element.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildstream/element.py b/buildstream/element.py index 5c8a8d604..af0c1a27c 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1994,7 +1994,7 @@ class Element(Plugin): 'sources': [s._get_unique_key(workspace is None) for s in self.__sources], 'workspace': '' if workspace is None else workspace.get_key(self._get_project()), 'public': self.__public, - 'cache': type(self.__artifacts).__name__ + 'cache': 'CASCache' } self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings) -- cgit v1.2.1 From eaf6f692869a3b3e0a92defdf604e662d6f770e7 Mon Sep 17 00:00:00 2001 From: Abderrahim Kitouni Date: Fri, 17 Jul 2020 11:46:36 +0100 Subject: Split up artifact cache and CAS cache This changes CASCache from a subclass to a delegate object of ArtifactCache. As the lower layer, CASCache no longer deals with elements or projects. Based on 626d20aefb52d25d987c61f377cc1ce3172da8c3 Fixes #659. --- buildstream/_artifactcache/artifactcache.py | 244 ++++++++++++---- buildstream/_artifactcache/cascache.py | 435 +++++++++++----------------- buildstream/_artifactcache/casserver.py | 21 +- buildstream/_context.py | 3 +- buildstream/_exceptions.py | 10 + tests/testutils/artifactshare.py | 6 +- 6 files changed, 380 insertions(+), 339 deletions(-) diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index ed5ef8262..38500a048 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -17,16 +17,21 @@ # Authors: # Tristan Maat +import multiprocessing import os +import signal import string from collections import Mapping, namedtuple from ..types import _KeyStrength -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason from .._message import Message, MessageType +from .. import _signals from .. import utils from .. import _yaml +from .cascache import CASCache, CASRemote + CACHE_SIZE_FILE = "cache_size" @@ -125,7 +130,8 @@ class ArtifactCache(): def __init__(self, context): self.context = context self.extractdir = os.path.join(context.artifactdir, 'extract') - self.tmpdir = os.path.join(context.artifactdir, 'tmp') + + self.cas = CASCache(context.artifactdir) self.global_remote_specs = [] self.project_remote_specs = {} @@ -136,12 +142,15 @@ class ArtifactCache(): self._cache_quota_original = None # The cache quota as specified by the user, in bytes self._cache_lower_threshold = None # The target cache size for a cleanup + # Per-project list of _CASRemote instances. + self._remotes = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + os.makedirs(self.extractdir, exist_ok=True) - os.makedirs(self.tmpdir, exist_ok=True) - ################################################ - # Methods implemented on the abstract class # - ################################################ + self._calculate_cache_quota() # get_artifact_fullname() # @@ -268,8 +277,10 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_mtime(element, key) - except ArtifactError: + ref = self.get_artifact_fullname(element, key) + + self.cas.update_mtime(ref) + except CASError: pass # clean(): @@ -392,7 +403,7 @@ class ArtifactCache(): # def compute_cache_size(self): old_cache_size = self._cache_size - new_cache_size = self.calculate_cache_size() + new_cache_size = self.cas.calculate_cache_size() if old_cache_size != new_cache_size: self._cache_size = new_cache_size @@ -467,28 +478,12 @@ class ArtifactCache(): def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota - ################################################ - # Abstract methods for subclasses to implement # - ################################################ - # preflight(): # # Preflight check. # def preflight(self): - pass - - # update_mtime() - # - # Update the mtime of an artifact. - # - # Args: - # element (Element): The Element to update - # key (str): The key of the artifact. - # - def update_mtime(self, element, key): - raise ImplError("Cache '{kind}' does not implement update_mtime()" - .format(kind=type(self).__name__)) + self.cas.preflight() # initialize_remotes(): # @@ -498,7 +493,59 @@ class ArtifactCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - pass + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise ArtifactError(error) + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes # contains(): # @@ -512,8 +559,9 @@ class ArtifactCache(): # Returns: True if the artifact is in the cache, False otherwise # def contains(self, element, key): - raise ImplError("Cache '{kind}' does not implement contains()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + return self.cas.contains(ref) # list_artifacts(): # @@ -524,8 +572,7 @@ class ArtifactCache(): # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): - raise ImplError("Cache '{kind}' does not implement list_artifacts()" - .format(kind=type(self).__name__)) + return self.cas.list_refs() # remove(): # @@ -537,9 +584,31 @@ class ArtifactCache(): # generated by # `ArtifactCache.get_artifact_fullname`) # - def remove(self, artifact_name): - raise ImplError("Cache '{kind}' does not implement remove()" - .format(kind=type(self).__name__)) + # Returns: + # (int|None) The amount of space pruned from the repository in + # Bytes, or None if defer_prune is True + # + def remove(self, ref): + + # Remove extract if not used by other ref + tree = self.cas.resolve_ref(ref) + ref_name, ref_hash = os.path.split(ref) + extract = os.path.join(self.extractdir, ref_name, tree.hash) + keys_file = os.path.join(extract, 'meta', 'keys.yaml') + if os.path.exists(keys_file): + keys_meta = _yaml.load(keys_file) + keys = [keys_meta['strong'], keys_meta['weak']] + remove_extract = True + for other_hash in keys: + if other_hash == ref_hash: + continue + remove_extract = False + break + + if remove_extract: + utils._force_rmtree(extract) + + return self.cas.remove(ref) # extract(): # @@ -559,8 +628,11 @@ class ArtifactCache(): # Returns: path to extracted artifact # def extract(self, element, key): - raise ImplError("Cache '{kind}' does not implement extract()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + path = os.path.join(self.extractdir, element._get_project().name, element.normal_name) + + return self.cas.extract(ref, path) # commit(): # @@ -572,8 +644,9 @@ class ArtifactCache(): # keys (list): The cache keys to use # def commit(self, element, content, keys): - raise ImplError("Cache '{kind}' does not implement commit()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in keys] + + self.cas.commit(refs, content) # diff(): # @@ -587,8 +660,10 @@ class ArtifactCache(): # subdir (str): A subdirectory to limit the comparison to # def diff(self, element, key_a, key_b, *, subdir=None): - raise ImplError("Cache '{kind}' does not implement diff()" - .format(kind=type(self).__name__)) + ref_a = self.get_artifact_fullname(element, key_a) + ref_b = self.get_artifact_fullname(element, key_b) + + return self.cas.diff(ref_a, ref_b, subdir=subdir) # has_fetch_remotes(): # @@ -600,7 +675,16 @@ class ArtifactCache(): # Returns: True if any remote repositories are configured, False otherwise # def has_fetch_remotes(self, *, element=None): - return False + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif element is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[element._get_project()] + return bool(remotes_for_project) # has_push_remotes(): # @@ -612,7 +696,16 @@ class ArtifactCache(): # Returns: True if any remote repository is configured, False otherwise # def has_push_remotes(self, *, element=None): - return False + if not self._has_push_remotes: + # No project has push remotes + return False + elif element is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[element._get_project()] + return any(remote.spec.push for remote in remotes_for_project) # push(): # @@ -629,8 +722,28 @@ class ArtifactCache(): # (ArtifactError): if there was an error # def push(self, element, keys): - raise ImplError("Cache '{kind}' does not implement push()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] + + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + + if self.cas.push(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + return pushed # pull(): # @@ -645,8 +758,32 @@ class ArtifactCache(): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, progress=None): - raise ImplError("Cache '{kind}' does not implement pull()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + display_key = key[:self.context.log_key_length] + + project = element._get_project() + + for remote in self._remotes[project]: + try: + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) + + if self.cas.pull(ref, remote, progress=progress): + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, display_key + )) + except BlobNotFound as e: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, display_key + )) + except CASError as e: + raise ArtifactError("Failed to pull artifact {}: {}".format( + display_key, e)) from e + + return False # link_key(): # @@ -658,19 +795,10 @@ class ArtifactCache(): # newkey (str): A new cache key for the artifact # def link_key(self, element, oldkey, newkey): - raise ImplError("Cache '{kind}' does not implement link_key()" - .format(kind=type(self).__name__)) + oldref = self.get_artifact_fullname(element, oldkey) + newref = self.get_artifact_fullname(element, newkey) - # calculate_cache_size() - # - # Return the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def calculate_cache_size(self): - raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" - .format(kind=type(self).__name__)) + self.cas.link_ref(oldref, newref) ################################################ # Local Private Methods # diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index a3d27c8d1..c6efcf508 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -19,9 +19,7 @@ import hashlib import itertools -import multiprocessing import os -import signal import stat import tempfile import uuid @@ -31,17 +29,12 @@ from urllib.parse import urlparse import grpc -from .. import _yaml - from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._message import MessageType, Message -from .. import _signals, utils -from .._exceptions import ArtifactError - -from . import ArtifactCache +from .. import utils +from .._exceptions import CASError # The default limit for gRPC messages is 4 MiB. @@ -89,68 +82,74 @@ def _retry(tries=5): break -class BlobNotFound(ArtifactError): +class BlobNotFound(CASError): def __init__(self, blob, msg): self.blob = blob super().__init__(msg) -# A CASCache manages artifacts in a CAS repository as specified in the -# Remote Execution API. +# A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: -# context (Context): The BuildStream context -# -# Pushing is explicitly disabled by the platform in some cases, -# like when we are falling back to functioning without using -# user namespaces. +# path (str): The root directory for the CAS repository # -class CASCache(ArtifactCache): +class CASCache(): - def __init__(self, context): - super().__init__(context) - - self.casdir = os.path.join(context.artifactdir, 'cas') + def __init__(self, path): + self.casdir = os.path.join(path, 'cas') + self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) - self._calculate_cache_quota() - - # Per-project list of _CASRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - ################################################ - # Implementation of abstract methods # - ################################################ - + # preflight(): + # + # Preflight check. + # def preflight(self): if (not os.path.isdir(os.path.join(self.casdir, 'refs', 'heads')) or not os.path.isdir(os.path.join(self.casdir, 'objects'))): - raise ArtifactError("CAS repository check failed for '{}'" - .format(self.casdir)) + raise CASError("CAS repository check failed for '{}'".format(self.casdir)) - def contains(self, element, key): - refpath = self._refpath(self.get_artifact_fullname(element, key)) + # contains(): + # + # Check whether the specified ref is already available in the local CAS cache. + # + # Args: + # ref (str): The ref to check + # + # Returns: True if the ref is in the cache, False otherwise + # + def contains(self, ref): + refpath = self._refpath(ref) # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) - def extract(self, element, key): - ref = self.get_artifact_fullname(element, key) - + # extract(): + # + # Extract cached directory for the specified ref if it hasn't + # already been extracted. + # + # Args: + # ref (str): The ref whose directory to extract + # path (str): The destination path + # + # Raises: + # CASError: In cases there was an OSError, or if the ref did not exist. + # + # Returns: path to extracted directory + # + def extract(self, ref, path): tree = self.resolve_ref(ref, update_mtime=True) - dest = os.path.join(self.extractdir, element._get_project().name, - element.normal_name, tree.hash) + dest = os.path.join(path, tree.hash) if os.path.isdir(dest): - # artifact has already been extracted + # directory has already been extracted return dest - with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: + with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir: checkoutdir = os.path.join(tmpdir, ref) self._checkout(checkoutdir, tree) @@ -164,23 +163,35 @@ class CASCache(ArtifactCache): # If rename fails with these errors, another process beat # us to it so just ignore. if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]: - raise ArtifactError("Failed to extract artifact for ref '{}': {}" - .format(ref, e)) from e + raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e return dest - def commit(self, element, content, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] - - tree = self._commit_directory(content) + # commit(): + # + # Commit directory to cache. + # + # Args: + # refs (list): The refs to set + # path (str): The directory to import + # + def commit(self, refs, path): + tree = self._commit_directory(path) for ref in refs: self.set_ref(ref, tree) - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = self.get_artifact_fullname(element, key_a) - ref_b = self.get_artifact_fullname(element, key_b) - + # diff(): + # + # Return a list of files that have been added or modified between + # the refs described by ref_a and ref_b. + # + # Args: + # ref_a (str): The first ref + # ref_b (str): The second ref + # subdir (str): A subdirectory to limit the comparison to + # + def diff(self, ref_a, ref_b, *, subdir=None): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) @@ -196,145 +207,103 @@ class CASCache(ArtifactCache): return modified, removed, added - def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs - - for project in self.project_remote_specs: - remote_specs += self.project_remote_specs[project] - - remote_specs = list(utils._deduplicate(remote_specs)) + def initialize_remote(self, remote_spec, q): + try: + remote = CASRemote(remote_spec) + remote.init() - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) + request = buildstream_pb2.StatusRequest() + for attempt in _retry(): + with attempt: + response = remote.ref_storage.Status(request) - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() - - error = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise - - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) + if remote_spec.push and not response.allow_updates: + q.put('CAS server does not allow push') else: - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True + # No error + q.put(None) - remotes[remote_spec.url] = _CASRemote(remote_spec) + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + q.put(e.details()) - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) - project_remotes = [] + # pull(): + # + # Pull a ref from a remote repository. + # + # Args: + # ref (str): The ref to pull + # remote (CASRemote): The remote repository to pull from + # progress (callable): The progress callback, if any + # + # Returns: + # (bool): True if pull was successful, False if ref was not available + # + def pull(self, ref, remote, *, progress=None): + try: + remote.init() - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + for attempt in _retry(): + with attempt: + response = remote.ref_storage.GetReference(request) - remote = remotes[remote_spec.url] - project_remotes.append(remote) + tree = remote_execution_pb2.Digest() + tree.hash = response.digest.hash + tree.size_bytes = response.digest.size_bytes - self._remotes[project] = project_remotes + self._fetch_directory(remote, tree) - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has fetch remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) + self.set_ref(ref, tree) - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - - def pull(self, element, key, *, progress=None): - ref = self.get_artifact_fullname(element, key) - display_key = key[:self.context.log_key_length] - - project = element._get_project() - - for remote in self._remotes[project]: - try: - remote.init() - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - for attempt in _retry(): - with attempt: - response = remote.ref_storage.GetReference(request) - - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes - - self._fetch_directory(remote, tree) - - self.set_ref(ref, tree) - - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact {}: {}".format( - display_key, e)) from e - else: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, display_key - )) - except BlobNotFound as e: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, display_key - )) - - return False - - def link_key(self, element, oldkey, newkey): - oldref = self.get_artifact_fullname(element, oldkey) - newref = self.get_artifact_fullname(element, newkey) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e + else: + return False + # link_ref(): + # + # Add an alias for an existing ref. + # + # Args: + # oldref (str): An existing ref + # newref (str): A new ref for the same directory + # + def link_ref(self, oldref, newref): tree = self.resolve_ref(oldref) self.set_ref(newref, tree) - def _push_refs_to_remote(self, refs, remote): + # push(): + # + # Push committed refs to remote repository. + # + # Args: + # refs (list): The refs to push + # remote (CASRemote): The remote to push to + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (CASError): if there was an error + # + def push(self, refs, remote): skipped_remote = True try: for ref in refs: tree = self.resolve_ref(ref) # Check whether ref is already on the server in which case - # there is no need to push the artifact + # there is no need to push the ref try: request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -364,42 +333,10 @@ class CASCache(ArtifactCache): skipped_remote = False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e return not skipped_remote - def push(self, element, keys): - - refs = [self.get_artifact_fullname(element, key) for key in list(keys)] - - project = element._get_project() - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - - if self._push_refs_to_remote(refs, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - pushed = True - else: - self.context.message(Message( - None, - MessageType.INFO, - "Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key()) - )) - - return pushed - - ################################################ - # API Private Methods # - ################################################ - # objpath(): # # Return the path of an object based on its digest. @@ -470,7 +407,7 @@ class CASCache(ArtifactCache): pass except OSError as e: - raise ArtifactError("Failed to hash object: {}".format(e)) from e + raise CASError("Failed to hash object: {}".format(e)) from e return digest @@ -511,26 +448,39 @@ class CASCache(ArtifactCache): return digest except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e - def update_mtime(self, element, key): + # update_mtime() + # + # Update the mtime of a ref. + # + # Args: + # ref (str): The ref to update + # + def update_mtime(self, ref): try: - ref = self.get_artifact_fullname(element, key) os.utime(self._refpath(ref)) except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e + # calculate_cache_size() + # + # Return the real disk usage of the CAS cache. + # + # Returns: + # (int): The size of the cache. + # def calculate_cache_size(self): return utils._get_dir_size(self.casdir) - # list_artifacts(): + # list_refs(): # - # List cached artifacts in Least Recently Modified (LRM) order. + # List refs in Least Recently Modified (LRM) order. # # Returns: # (list) - A list of refs in LRM order # - def list_artifacts(self): + def list_refs(self): # string of: /path/to/repo/refs/heads ref_heads = os.path.join(self.casdir, 'refs', 'heads') @@ -545,7 +495,7 @@ class CASCache(ArtifactCache): mtimes.append(os.path.getmtime(ref_path)) # NOTE: Sorted will sort from earliest to latest, thus the - # first element of this list will be the file modified earliest. + # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] # list_objects(): @@ -599,28 +549,10 @@ class CASCache(ArtifactCache): # def remove(self, ref, *, defer_prune=False): - # Remove extract if not used by other ref - tree = self.resolve_ref(ref) - ref_name, ref_hash = os.path.split(ref) - extract = os.path.join(self.extractdir, ref_name, tree.hash) - keys_file = os.path.join(extract, 'meta', 'keys.yaml') - if os.path.exists(keys_file): - keys_meta = _yaml.load(keys_file) - keys = [keys_meta['strong'], keys_meta['weak']] - remove_extract = True - for other_hash in keys: - if other_hash == ref_hash: - continue - remove_extract = False - break - - if remove_extract: - utils._force_rmtree(extract) - # Remove cache ref refpath = self._refpath(ref) if not os.path.exists(refpath): - raise ArtifactError("Could not find artifact for ref '{}'".format(ref)) + raise CASError("Could not find ref '{}'".format(ref)) os.unlink(refpath) @@ -731,7 +663,7 @@ class CASCache(ArtifactCache): symlinknode.name = name symlinknode.target = os.readlink(full_path) else: - raise ArtifactError("Unsupported file type for {}".format(full_path)) + raise CASError("Unsupported file type for {}".format(full_path)) return self.add_object(digest=dir_digest, buffer=directory.SerializeToString()) @@ -750,7 +682,7 @@ class CASCache(ArtifactCache): if dirnode.name == name: return dirnode.digest - raise ArtifactError("Subdirectory {} not found".format(name)) + raise CASError("Subdirectory {} not found".format(name)) def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): dir_a = remote_execution_pb2.Directory() @@ -827,31 +759,6 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest, update_mtime=update_mtime) - def _initialize_remote(self, remote_spec, q): - try: - remote = _CASRemote(remote_spec) - remote.init() - - request = buildstream_pb2.StatusRequest() - for attempt in _retry(): - with attempt: - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('Artifact server does not allow push') - else: - # No error - q.put(None) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - q.put(e.details()) - - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put(str(e)) - def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() @@ -1091,7 +998,7 @@ class CASCache(ArtifactCache): # Represents a single remote CAS cache. # -class _CASRemote(): +class CASRemote(): def __init__(self, spec): self.spec = spec self._initialized = False @@ -1132,7 +1039,7 @@ class _CASRemote(): certificate_chain=client_cert_bytes) self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) else: - raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) + raise CASError("Unsupported URL: {}".format(self.spec.url)) self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) @@ -1221,10 +1128,10 @@ class _CASBatchRead(): raise BlobNotFound(response.digest.hash, "Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.status.code != grpc.StatusCode.OK.value[0]: - raise ArtifactError("Failed to download blob {}: {}".format( + raise CASError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.digest.size_bytes != len(response.data): - raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) yield (response.digest, response.data) @@ -1268,7 +1175,7 @@ class _CASBatchUpdate(): for response in batch_response.responses: if response.status.code != grpc.StatusCode.OK.value[0]: - raise ArtifactError("Failed to upload blob {}: {}".format( + raise CASError("Failed to upload blob {}: {}".format( response.digest.hash, response.status.code)) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 4a9d5191a..1fdab80a8 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -34,8 +34,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._exceptions import ArtifactError -from .._context import Context +from .._exceptions import CASError + +from .cascache import CASCache # The default limit for gRPC messages is 4 MiB. @@ -66,29 +67,25 @@ def message_handler(message, context): def create_server(repo, *, enable_push, max_head_size=int(10e9), min_head_size=int(2e9)): - context = Context() - context.artifactdir = os.path.abspath(repo) - context.set_message_handler(message_handler) - - artifactcache = context.artifactcache + cas = CASCache(os.path.abspath(repo)) # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) - cache_cleaner = _CacheCleaner(artifactcache, max_head_size, min_head_size) + cache_cleaner = _CacheCleaner(cas, max_head_size, min_head_size) bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) + _ByteStreamServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache, cache_cleaner, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, cache_cleaner, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server) + _ReferenceStorageServicer(cas, enable_push=enable_push), server) return server @@ -389,7 +386,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes - except ArtifactError: + except CASError: context.set_code(grpc.StatusCode.NOT_FOUND) return response diff --git a/buildstream/_context.py b/buildstream/_context.py index f36bfd343..8ee45f787 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -30,7 +30,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end from ._artifactcache import ArtifactCache, ArtifactCacheUsage -from ._artifactcache.cascache import CASCache from ._workspaces import Workspaces from .plugin import Plugin @@ -246,7 +245,7 @@ class Context(): @property def artifactcache(self): if not self._artifactcache: - self._artifactcache = CASCache(self) + self._artifactcache = ArtifactCache(self) return self._artifactcache diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index aeacf8dcd..44d890ead 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -89,6 +89,7 @@ class ErrorDomain(Enum): ELEMENT = 11 APP = 12 STREAM = 13 + CAS = 15 # BstError is an internal base exception class for BuildSream @@ -275,6 +276,15 @@ class ArtifactError(BstError): super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True) +# CASError +# +# Raised when errors are encountered in the CAS +# +class CASError(BstError): + def __init__(self, message, *, detail=None, reason=None, temporary=False): + super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True) + + # PipelineError # # Raised from pipeline operations diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index c7987e02c..d929a7b91 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -13,7 +13,7 @@ from multiprocessing import Process, Queue from buildstream import _yaml from buildstream._artifactcache.casserver import create_server from buildstream._context import Context -from buildstream._exceptions import ArtifactError +from buildstream._exceptions import CASError # ArtifactShare() @@ -52,7 +52,7 @@ class ArtifactShare(): context.artifactdir = self.repodir context.set_message_handler(self._message_handler) - self.cas = context.artifactcache + self.cas = context.artifactcache.cas self.total_space = total_space self.free_space = free_space @@ -144,7 +144,7 @@ class ArtifactShare(): if not os.path.exists(object_name): return False return True - except ArtifactError: + except CASError: return False # close(): -- cgit v1.2.1 From 4df98d3483d4babe809c1cb96cffc946dd1bc768 Mon Sep 17 00:00:00 2001 From: Abderrahim Kitouni Date: Fri, 10 Jul 2020 10:32:32 +0100 Subject: tests/testutils/artifactshare.py: use CASCache directly --- tests/testutils/artifactshare.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index d929a7b91..03186862a 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -12,7 +12,7 @@ from multiprocessing import Process, Queue from buildstream import _yaml from buildstream._artifactcache.casserver import create_server -from buildstream._context import Context +from buildstream._artifactcache.cascache import CASCache from buildstream._exceptions import CASError @@ -48,11 +48,7 @@ class ArtifactShare(): os.makedirs(self.repodir) - context = Context() - context.artifactdir = self.repodir - context.set_message_handler(self._message_handler) - - self.cas = context.artifactcache.cas + self.cas = CASCache(self.repodir) self.total_space = total_space self.free_space = free_space -- cgit v1.2.1