diff options
author | Jürg Billeter <j@bitron.ch> | 2018-11-05 17:51:39 +0000 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2018-11-05 17:51:39 +0000 |
commit | ec04446b58de7c04e587378c7086cff068fa8025 (patch) | |
tree | 5a451d29e9c793ba86ded6a5867c1f3911bfd662 | |
parent | be8f0a54764e8190efdf9361f3ca3f7a44dea0e3 (diff) | |
parent | 626d20aefb52d25d987c61f377cc1ce3172da8c3 (diff) | |
download | buildstream-ec04446b58de7c04e587378c7086cff068fa8025.tar.gz |
Merge branch 'juerg/cas' into 'master'
Split up artifact cache and CAS cache
Closes #659
See merge request BuildStream/buildstream!922
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 346 | ||||
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 537 | ||||
-rw-r--r-- | buildstream/_artifactcache/casserver.py | 20 | ||||
-rw-r--r-- | buildstream/_context.py | 3 | ||||
-rw-r--r-- | buildstream/_elementfactory.py | 5 | ||||
-rw-r--r-- | buildstream/_exceptions.py | 10 | ||||
-rw-r--r-- | buildstream/_loader/loader.py | 2 | ||||
-rw-r--r-- | buildstream/_pipeline.py | 2 | ||||
-rw-r--r-- | buildstream/_project.py | 12 | ||||
-rw-r--r-- | buildstream/element.py | 15 | ||||
-rw-r--r-- | buildstream/storage/_casbaseddirectory.py | 2 | ||||
-rw-r--r-- | tests/artifactcache/pull.py | 22 | ||||
-rw-r--r-- | tests/artifactcache/push.py | 19 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 6 |
14 files changed, 572 insertions, 429 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index 8ea6c9dc2..06a2b84e0 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -17,17 +17,22 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import multiprocessing import os +import signal import string from collections import namedtuple from collections.abc import Mapping from ..types import _KeyStrength -from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason +from .._exceptions import ArtifactError, CASError, LoadError, LoadErrorReason from .._message import Message, MessageType +from .. import _signals from .. import utils from .. import _yaml +from .cascache import CASCache, CASRemote + CACHE_SIZE_FILE = "cache_size" @@ -93,7 +98,8 @@ class ArtifactCache(): def __init__(self, context): self.context = context self.extractdir = os.path.join(context.artifactdir, 'extract') - self.tmpdir = os.path.join(context.artifactdir, 'tmp') + + self.cas = CASCache(context.artifactdir) self.global_remote_specs = [] self.project_remote_specs = {} @@ -104,12 +110,15 @@ class ArtifactCache(): self._cache_lower_threshold = None # The target cache size for a cleanup self._remotes_setup = False # Check to prevent double-setup of remotes + # Per-project list of _CASRemote instances. + self._remotes = {} + + self._has_fetch_remotes = False + self._has_push_remotes = False + os.makedirs(self.extractdir, exist_ok=True) - os.makedirs(self.tmpdir, exist_ok=True) - ################################################ - # Methods implemented on the abstract class # - ################################################ + self._calculate_cache_quota() # get_artifact_fullname() # @@ -240,8 +249,10 @@ class ArtifactCache(): for key in (strong_key, weak_key): if key: try: - self.update_mtime(element, key) - except ArtifactError: + ref = self.get_artifact_fullname(element, key) + + self.cas.update_mtime(ref) + except CASError: pass # clean(): @@ -252,7 +263,7 @@ class ArtifactCache(): # (int): The size of the cache after having cleaned up # def clean(self): - artifacts = self.list_artifacts() # pylint: disable=assignment-from-no-return + artifacts = self.list_artifacts() # Build a set of the cache keys which are required # based on the required elements at cleanup time @@ -294,7 +305,7 @@ class ArtifactCache(): if key not in required_artifacts: # Remove the actual artifact, if it's not required. - size = self.remove(to_remove) # pylint: disable=assignment-from-no-return + size = self.remove(to_remove) # Remove the size from the removed size self.set_cache_size(self._cache_size - size) @@ -311,7 +322,7 @@ class ArtifactCache(): # (int): The size of the artifact cache. # def compute_cache_size(self): - self._cache_size = self.calculate_cache_size() # pylint: disable=assignment-from-no-return + self._cache_size = self.cas.calculate_cache_size() return self._cache_size @@ -380,28 +391,12 @@ class ArtifactCache(): def has_quota_exceeded(self): return self.get_cache_size() > self._cache_quota - ################################################ - # Abstract methods for subclasses to implement # - ################################################ - # preflight(): # # Preflight check. # def preflight(self): - pass - - # update_mtime() - # - # Update the mtime of an artifact. - # - # Args: - # element (Element): The Element to update - # key (str): The key of the artifact. - # - def update_mtime(self, element, key): - raise ImplError("Cache '{kind}' does not implement update_mtime()" - .format(kind=type(self).__name__)) + self.cas.preflight() # initialize_remotes(): # @@ -411,7 +406,59 @@ class ArtifactCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - pass + remote_specs = self.global_remote_specs + + for project in self.project_remote_specs: + remote_specs += self.project_remote_specs[project] + + remote_specs = list(utils._deduplicate(remote_specs)) + + remotes = {} + q = multiprocessing.Queue() + for remote_spec in remote_specs: + # Use subprocess to avoid creation of gRPC threads in main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + p = multiprocessing.Process(target=self.cas.initialize_remote, args=(remote_spec, q)) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + if error and on_failure: + on_failure(remote_spec.url, error) + elif error: + raise ArtifactError(error) + else: + self._has_fetch_remotes = True + if remote_spec.push: + self._has_push_remotes = True + + remotes[remote_spec.url] = CASRemote(remote_spec) + + for project in self.context.get_projects(): + remote_specs = self.global_remote_specs + if project in self.project_remote_specs: + remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + + project_remotes = [] + + for remote_spec in remote_specs: + # Errors are already handled in the loop above, + # skip unreachable remotes here. + if remote_spec.url not in remotes: + continue + + remote = remotes[remote_spec.url] + project_remotes.append(remote) + + self._remotes[project] = project_remotes # contains(): # @@ -425,8 +472,9 @@ class ArtifactCache(): # Returns: True if the artifact is in the cache, False otherwise # def contains(self, element, key): - raise ImplError("Cache '{kind}' does not implement contains()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + return self.cas.contains(ref) # list_artifacts(): # @@ -437,8 +485,7 @@ class ArtifactCache(): # `ArtifactCache.get_artifact_fullname` in LRU order # def list_artifacts(self): - raise ImplError("Cache '{kind}' does not implement list_artifacts()" - .format(kind=type(self).__name__)) + return self.cas.list_refs() # remove(): # @@ -450,9 +497,31 @@ class ArtifactCache(): # generated by # `ArtifactCache.get_artifact_fullname`) # - def remove(self, artifact_name): - raise ImplError("Cache '{kind}' does not implement remove()" - .format(kind=type(self).__name__)) + # Returns: + # (int|None) The amount of space pruned from the repository in + # Bytes, or None if defer_prune is True + # + def remove(self, ref): + + # Remove extract if not used by other ref + tree = self.cas.resolve_ref(ref) + ref_name, ref_hash = os.path.split(ref) + extract = os.path.join(self.extractdir, ref_name, tree.hash) + keys_file = os.path.join(extract, 'meta', 'keys.yaml') + if os.path.exists(keys_file): + keys_meta = _yaml.load(keys_file) + keys = [keys_meta['strong'], keys_meta['weak']] + remove_extract = True + for other_hash in keys: + if other_hash == ref_hash: + continue + remove_extract = False + break + + if remove_extract: + utils._force_rmtree(extract) + + return self.cas.remove(ref) # extract(): # @@ -472,8 +541,11 @@ class ArtifactCache(): # Returns: path to extracted artifact # def extract(self, element, key): - raise ImplError("Cache '{kind}' does not implement extract()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + path = os.path.join(self.extractdir, element._get_project().name, element.normal_name) + + return self.cas.extract(ref, path) # commit(): # @@ -485,8 +557,9 @@ class ArtifactCache(): # keys (list): The cache keys to use # def commit(self, element, content, keys): - raise ImplError("Cache '{kind}' does not implement commit()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in keys] + + self.cas.commit(refs, content) # diff(): # @@ -500,8 +573,10 @@ class ArtifactCache(): # subdir (str): A subdirectory to limit the comparison to # def diff(self, element, key_a, key_b, *, subdir=None): - raise ImplError("Cache '{kind}' does not implement diff()" - .format(kind=type(self).__name__)) + ref_a = self.get_artifact_fullname(element, key_a) + ref_b = self.get_artifact_fullname(element, key_b) + + return self.cas.diff(ref_a, ref_b, subdir=subdir) # has_fetch_remotes(): # @@ -513,7 +588,16 @@ class ArtifactCache(): # Returns: True if any remote repositories are configured, False otherwise # def has_fetch_remotes(self, *, element=None): - return False + if not self._has_fetch_remotes: + # No project has fetch remotes + return False + elif element is None: + # At least one (sub)project has fetch remotes + return True + else: + # Check whether the specified element's project has fetch remotes + remotes_for_project = self._remotes[element._get_project()] + return bool(remotes_for_project) # has_push_remotes(): # @@ -525,7 +609,16 @@ class ArtifactCache(): # Returns: True if any remote repository is configured, False otherwise # def has_push_remotes(self, *, element=None): - return False + if not self._has_push_remotes: + # No project has push remotes + return False + elif element is None: + # At least one (sub)project has push remotes + return True + else: + # Check whether the specified element's project has push remotes + remotes_for_project = self._remotes[element._get_project()] + return any(remote.spec.push for remote in remotes_for_project) # push(): # @@ -542,8 +635,28 @@ class ArtifactCache(): # (ArtifactError): if there was an error # def push(self, element, keys): - raise ImplError("Cache '{kind}' does not implement push()" - .format(kind=type(self).__name__)) + refs = [self.get_artifact_fullname(element, key) for key in list(keys)] + + project = element._get_project() + + push_remotes = [r for r in self._remotes[project] if r.spec.push] + + pushed = False + + for remote in push_remotes: + remote.init() + display_key = element._get_brief_display_key() + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) + + if self.cas.push(refs, remote): + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + pushed = True + else: + element.info("Remote ({}) already has {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + return pushed # pull(): # @@ -558,8 +671,130 @@ class ArtifactCache(): # (bool): True if pull was successful, False if artifact was not available # def pull(self, element, key, *, progress=None): - raise ImplError("Cache '{kind}' does not implement pull()" - .format(kind=type(self).__name__)) + ref = self.get_artifact_fullname(element, key) + + project = element._get_project() + + for remote in self._remotes[project]: + try: + display_key = element._get_brief_display_key() + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) + + if self.cas.pull(ref, remote, progress=progress): + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) + # no need to pull from additional remotes + return True + else: + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, element._get_brief_display_key() + )) + + except CASError as e: + raise ArtifactError("Failed to pull artifact {}: {}".format( + element._get_brief_display_key(), e)) from e + + return False + + # pull_tree(): + # + # Pull a single Tree rather than an artifact. + # Does not update local refs. + # + # Args: + # project (Project): The current project + # digest (Digest): The digest of the tree + # + def pull_tree(self, project, digest): + for remote in self._remotes[project]: + digest = self.cas.pull_tree(remote, digest) + + if digest: + # no need to pull from additional remotes + return digest + + return None + + # push_directory(): + # + # Push the given virtual directory to all remotes. + # + # Args: + # project (Project): The current project + # directory (Directory): A virtual directory object to push. + # + # Raises: + # (ArtifactError): if there was an error + # + def push_directory(self, project, directory): + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("push_directory was called, but no remote artifact " + + "servers are configured as push remotes.") + + if directory.ref is None: + return + + for remote in push_remotes: + self.cas.push_directory(remote, directory) + + # push_message(): + # + # Push the given protobuf message to all remotes. + # + # Args: + # project (Project): The current project + # message (Message): A protobuf message to push. + # + # Raises: + # (ArtifactError): if there was an error + # + def push_message(self, project, message): + + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("push_message was called, but no remote artifact " + + "servers are configured as push remotes.") + + for remote in push_remotes: + message_digest = self.cas.push_message(remote, message) + + return message_digest + + # verify_digest_pushed(): + # + # Check whether the object is already on the server in which case + # there is no need to upload it. + # + # Args: + # project (Project): The current project + # digest (Digest): The object digest. + # + def verify_digest_pushed(self, project, digest): + + if self._has_push_remotes: + push_remotes = [r for r in self._remotes[project] if r.spec.push] + else: + push_remotes = [] + + if not push_remotes: + raise ArtifactError("verify_digest_pushed was called, but no remote artifact " + + "servers are configured as push remotes.") + + pushed = False + + for remote in push_remotes: + if self.cas.verify_digest_on_remote(remote, digest): + pushed = True + + return pushed # link_key(): # @@ -571,19 +806,10 @@ class ArtifactCache(): # newkey (str): A new cache key for the artifact # def link_key(self, element, oldkey, newkey): - raise ImplError("Cache '{kind}' does not implement link_key()" - .format(kind=type(self).__name__)) + oldref = self.get_artifact_fullname(element, oldkey) + newref = self.get_artifact_fullname(element, newkey) - # calculate_cache_size() - # - # Return the real artifact cache size. - # - # Returns: - # (int): The size of the artifact cache. - # - def calculate_cache_size(self): - raise ImplError("Cache '{kind}' does not implement calculate_cache_size()" - .format(kind=type(self).__name__)) + self.cas.link_ref(oldref, newref) ################################################ # Local Private Methods # diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index b99f4d7b8..b6e26ec8b 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -20,9 +20,7 @@ import hashlib import itertools import io -import multiprocessing import os -import signal import stat import tempfile import uuid @@ -31,17 +29,13 @@ from urllib.parse import urlparse import grpc -from .. import _yaml - from .._protos.google.rpc import code_pb2 from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .. import _signals, utils -from .._exceptions import ArtifactError - -from . import ArtifactCache +from .. import utils +from .._exceptions import CASError # The default limit for gRPC messages is 4 MiB. @@ -49,62 +43,68 @@ from . import ArtifactCache _MAX_PAYLOAD_BYTES = 1024 * 1024 -# A CASCache manages artifacts in a CAS repository as specified in the -# Remote Execution API. +# A CASCache manages a CAS repository as specified in the Remote Execution API. # # Args: -# context (Context): The BuildStream context -# -# Pushing is explicitly disabled by the platform in some cases, -# like when we are falling back to functioning without using -# user namespaces. +# path (str): The root directory for the CAS repository # -class CASCache(ArtifactCache): +class CASCache(): - def __init__(self, context): - super().__init__(context) - - self.casdir = os.path.join(context.artifactdir, 'cas') + def __init__(self, path): + self.casdir = os.path.join(path, 'cas') + self.tmpdir = os.path.join(path, 'tmp') os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True) os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) + os.makedirs(self.tmpdir, exist_ok=True) - self._calculate_cache_quota() - - # Per-project list of _CASRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - ################################################ - # Implementation of abstract methods # - ################################################ - + # preflight(): + # + # Preflight check. + # def preflight(self): headdir = os.path.join(self.casdir, 'refs', 'heads') objdir = os.path.join(self.casdir, 'objects') if not (os.path.isdir(headdir) and os.path.isdir(objdir)): - raise ArtifactError("CAS repository check failed for '{}'" - .format(self.casdir)) + raise CASError("CAS repository check failed for '{}'".format(self.casdir)) - def contains(self, element, key): - refpath = self._refpath(self.get_artifact_fullname(element, key)) + # contains(): + # + # Check whether the specified ref is already available in the local CAS cache. + # + # Args: + # ref (str): The ref to check + # + # Returns: True if the ref is in the cache, False otherwise + # + def contains(self, ref): + refpath = self._refpath(ref) # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) - def extract(self, element, key): - ref = self.get_artifact_fullname(element, key) - + # extract(): + # + # Extract cached directory for the specified ref if it hasn't + # already been extracted. + # + # Args: + # ref (str): The ref whose directory to extract + # path (str): The destination path + # + # Raises: + # CASError: In cases there was an OSError, or if the ref did not exist. + # + # Returns: path to extracted directory + # + def extract(self, ref, path): tree = self.resolve_ref(ref, update_mtime=True) - dest = os.path.join(self.extractdir, element._get_project().name, - element.normal_name, tree.hash) + dest = os.path.join(path, tree.hash) if os.path.isdir(dest): - # artifact has already been extracted + # directory has already been extracted return dest - with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: + with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir: checkoutdir = os.path.join(tmpdir, ref) self._checkout(checkoutdir, tree) @@ -118,23 +118,35 @@ class CASCache(ArtifactCache): # If rename fails with these errors, another process beat # us to it so just ignore. if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]: - raise ArtifactError("Failed to extract artifact for ref '{}': {}" - .format(ref, e)) from e + raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e return dest - def commit(self, element, content, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] - - tree = self._commit_directory(content) + # commit(): + # + # Commit directory to cache. + # + # Args: + # refs (list): The refs to set + # path (str): The directory to import + # + def commit(self, refs, path): + tree = self._commit_directory(path) for ref in refs: self.set_ref(ref, tree) - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = self.get_artifact_fullname(element, key_a) - ref_b = self.get_artifact_fullname(element, key_b) - + # diff(): + # + # Return a list of files that have been added or modified between + # the refs described by ref_a and ref_b. + # + # Args: + # ref_a (str): The first ref + # ref_b (str): The second ref + # subdir (str): A subdirectory to limit the comparison to + # + def diff(self, ref_a, ref_b, *, subdir=None): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) @@ -150,158 +162,122 @@ class CASCache(ArtifactCache): return modified, removed, added - def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs - - for project in self.project_remote_specs: - remote_specs += self.project_remote_specs[project] - - remote_specs = list(utils._deduplicate(remote_specs)) - - remotes = {} - q = multiprocessing.Queue() - for remote_spec in remote_specs: - # Use subprocess to avoid creation of gRPC threads in main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - p = multiprocessing.Process(target=self._initialize_remote, args=(remote_spec, q)) + def initialize_remote(self, remote_spec, q): + try: + remote = CASRemote(remote_spec) + remote.init() - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() - - error = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise + request = buildstream_pb2.StatusRequest() + response = remote.ref_storage.Status(request) - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) + if remote_spec.push and not response.allow_updates: + q.put('CAS server does not allow push') else: - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True + # No error + q.put(None) - remotes[remote_spec.url] = _CASRemote(remote_spec) + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + q.put(e.details()) - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) - project_remotes = [] + # pull(): + # + # Pull a ref from a remote repository. + # + # Args: + # ref (str): The ref to pull + # remote (CASRemote): The remote repository to pull from + # progress (callable): The progress callback, if any + # + # Returns: + # (bool): True if pull was successful, False if ref was not available + # + def pull(self, ref, remote, *, progress=None): + try: + remote.init() - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remotes: - continue + request = buildstream_pb2.GetReferenceRequest() + request.key = ref + response = remote.ref_storage.GetReference(request) - remote = remotes[remote_spec.url] - project_remotes.append(remote) + tree = remote_execution_pb2.Digest() + tree.hash = response.digest.hash + tree.size_bytes = response.digest.size_bytes - self._remotes[project] = project_remotes + self._fetch_directory(remote, tree) - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has fetch remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) + self.set_ref(ref, tree) - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - - def pull(self, element, key, *, progress=None): - ref = self.get_artifact_fullname(element, key) - - project = element._get_project() - - for remote in self._remotes[project]: - try: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - - request = buildstream_pb2.GetReferenceRequest() - request.key = ref - response = remote.ref_storage.GetReference(request) - - tree = remote_execution_pb2.Digest() - tree.hash = response.digest.hash - tree.size_bytes = response.digest.size_bytes - - self._fetch_directory(remote, tree) - - self.set_ref(ref, tree) - - element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True - - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise ArtifactError("Failed to pull artifact {}: {}".format( - element._get_brief_display_key(), e)) from e - else: - element.info("Remote ({}) does not have {} cached".format( - remote.spec.url, element._get_brief_display_key() - )) - - return False - - def pull_tree(self, project, digest): - """ Pull a single Tree rather than an artifact. - Does not update local refs. """ + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise CASError("Failed to pull ref {}: {}".format(ref, e)) from e + else: + return False - for remote in self._remotes[project]: - try: - remote.init() + # pull_tree(): + # + # Pull a single Tree rather than a ref. + # Does not update local refs. + # + # Args: + # remote (CASRemote): The remote to pull from + # digest (Digest): The digest of the tree + # + def pull_tree(self, remote, digest): + try: + remote.init() - digest = self._fetch_tree(remote, digest) + digest = self._fetch_tree(remote, digest) - # no need to pull from additional remotes - return digest + return digest - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise return None - def link_key(self, element, oldkey, newkey): - oldref = self.get_artifact_fullname(element, oldkey) - newref = self.get_artifact_fullname(element, newkey) - + # link_ref(): + # + # Add an alias for an existing ref. + # + # Args: + # oldref (str): An existing ref + # newref (str): A new ref for the same directory + # + def link_ref(self, oldref, newref): tree = self.resolve_ref(oldref) self.set_ref(newref, tree) - def _push_refs_to_remote(self, refs, remote): + # push(): + # + # Push committed refs to remote repository. + # + # Args: + # refs (list): The refs to push + # remote (CASRemote): The remote to push to + # + # Returns: + # (bool): True if any remote was updated, False if no pushes were required + # + # Raises: + # (CASError): if there was an error + # + def push(self, refs, remote): skipped_remote = True try: for ref in refs: tree = self.resolve_ref(ref) # Check whether ref is already on the server in which case - # there is no need to push the artifact + # there is no need to push the ref try: request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -327,65 +303,38 @@ class CASCache(ArtifactCache): skipped_remote = False except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e + raise CASError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e return not skipped_remote - def push(self, element, keys): - - refs = [self.get_artifact_fullname(element, key) for key in list(keys)] - - project = element._get_project() - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - display_key = element._get_brief_display_key() - element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - - if self._push_refs_to_remote(refs, remote): - element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) - pushed = True - else: - element.info("Remote ({}) already has {} cached".format( - remote.spec.url, element._get_brief_display_key() - )) - - return pushed - - def push_directory(self, project, directory): - """ Push the given virtual directory to all remotes. - - Args: - project (Project): The current project - directory (Directory): A virtual directory object to push. - - Raises: ArtifactError if no push remotes are configured. - """ - - if self._has_push_remotes: - push_remotes = [r for r in self._remotes[project] if r.spec.push] - else: - push_remotes = [] - - if not push_remotes: - raise ArtifactError("CASCache: push_directory was called, but no remote artifact " + - "servers are configured as push remotes.") - - if directory.ref is None: - return - - for remote in push_remotes: - remote.init() - - self._send_directory(remote, directory.ref) + # push_directory(): + # + # Push the given virtual directory to a remote. + # + # Args: + # remote (CASRemote): The remote to push to + # directory (Directory): A virtual directory object to push. + # + # Raises: + # (CASError): if there was an error + # + def push_directory(self, remote, directory): + remote.init() - def push_message(self, project, message): + self._send_directory(remote, directory.ref) - push_remotes = [r for r in self._remotes[project] if r.spec.push] + # push_message(): + # + # Push the given protobuf message to a remote. + # + # Args: + # remote (CASRemote): The remote to push to + # message (Message): A protobuf message to push. + # + # Raises: + # (CASError): if there was an error + # + def push_message(self, remote, message): message_buffer = message.SerializeToString() message_sha = hashlib.sha256(message_buffer) @@ -393,17 +342,25 @@ class CASCache(ArtifactCache): message_digest.hash = message_sha.hexdigest() message_digest.size_bytes = len(message_buffer) - for remote in push_remotes: - remote.init() + remote.init() - with io.BytesIO(message_buffer) as b: - self._send_blob(remote, message_digest, b) + with io.BytesIO(message_buffer) as b: + self._send_blob(remote, message_digest, b) return message_digest - def _verify_digest_on_remote(self, remote, digest): - # Check whether ref is already on the server in which case - # there is no need to push the artifact + # verify_digest_on_remote(): + # + # Check whether the object is already on the server in which case + # there is no need to upload it. + # + # Args: + # remote (CASRemote): The remote to check + # digest (Digest): The object digest. + # + def verify_digest_on_remote(self, remote, digest): + remote.init() + request = remote_execution_pb2.FindMissingBlobsRequest() request.blob_digests.extend([digest]) @@ -413,24 +370,6 @@ class CASCache(ArtifactCache): return True - def verify_digest_pushed(self, project, digest): - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - pushed = False - - for remote in push_remotes: - remote.init() - - if self._verify_digest_on_remote(remote, digest): - pushed = True - - return pushed - - ################################################ - # API Private Methods # - ################################################ - # objpath(): # # Return the path of an object based on its digest. @@ -496,7 +435,7 @@ class CASCache(ArtifactCache): pass except OSError as e: - raise ArtifactError("Failed to hash object: {}".format(e)) from e + raise CASError("Failed to hash object: {}".format(e)) from e return digest @@ -537,26 +476,39 @@ class CASCache(ArtifactCache): return digest except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e - def update_mtime(self, element, key): + # update_mtime() + # + # Update the mtime of a ref. + # + # Args: + # ref (str): The ref to update + # + def update_mtime(self, ref): try: - ref = self.get_artifact_fullname(element, key) os.utime(self._refpath(ref)) except FileNotFoundError as e: - raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e + raise CASError("Attempt to access unavailable ref: {}".format(e)) from e + # calculate_cache_size() + # + # Return the real disk usage of the CAS cache. + # + # Returns: + # (int): The size of the cache. + # def calculate_cache_size(self): return utils._get_dir_size(self.casdir) - # list_artifacts(): + # list_refs(): # - # List cached artifacts in Least Recently Modified (LRM) order. + # List refs in Least Recently Modified (LRM) order. # # Returns: # (list) - A list of refs in LRM order # - def list_artifacts(self): + def list_refs(self): # string of: /path/to/repo/refs/heads ref_heads = os.path.join(self.casdir, 'refs', 'heads') @@ -571,7 +523,7 @@ class CASCache(ArtifactCache): mtimes.append(os.path.getmtime(ref_path)) # NOTE: Sorted will sort from earliest to latest, thus the - # first element of this list will be the file modified earliest. + # first ref of this list will be the file modified earliest. return [ref for _, ref in sorted(zip(mtimes, refs))] # remove(): @@ -590,28 +542,10 @@ class CASCache(ArtifactCache): # def remove(self, ref, *, defer_prune=False): - # Remove extract if not used by other ref - tree = self.resolve_ref(ref) - ref_name, ref_hash = os.path.split(ref) - extract = os.path.join(self.extractdir, ref_name, tree.hash) - keys_file = os.path.join(extract, 'meta', 'keys.yaml') - if os.path.exists(keys_file): - keys_meta = _yaml.load(keys_file) - keys = [keys_meta['strong'], keys_meta['weak']] - remove_extract = True - for other_hash in keys: - if other_hash == ref_hash: - continue - remove_extract = False - break - - if remove_extract: - utils._force_rmtree(extract) - # Remove cache ref refpath = self._refpath(ref) if not os.path.exists(refpath): - raise ArtifactError("Could not find artifact for ref '{}'".format(ref)) + raise CASError("Could not find ref '{}'".format(ref)) os.unlink(refpath) @@ -721,7 +655,7 @@ class CASCache(ArtifactCache): # The process serving the socket can't be cached anyway pass else: - raise ArtifactError("Unsupported file type for {}".format(full_path)) + raise CASError("Unsupported file type for {}".format(full_path)) return self.add_object(digest=dir_digest, buffer=directory.SerializeToString()) @@ -740,7 +674,7 @@ class CASCache(ArtifactCache): if dirnode.name == name: return dirnode.digest - raise ArtifactError("Subdirectory {} not found".format(name)) + raise CASError("Subdirectory {} not found".format(name)) def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): dir_a = remote_execution_pb2.Directory() @@ -812,29 +746,6 @@ class CASCache(ArtifactCache): for dirnode in directory.directories: self._reachable_refs_dir(reachable, dirnode.digest) - def _initialize_remote(self, remote_spec, q): - try: - remote = _CASRemote(remote_spec) - remote.init() - - request = buildstream_pb2.StatusRequest() - response = remote.ref_storage.Status(request) - - if remote_spec.push and not response.allow_updates: - q.put('Artifact server does not allow push') - else: - # No error - q.put(None) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - q.put(e.details()) - - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put(str(e)) - def _required_blobs(self, directory_digest): # parse directory, and recursively add blobs d = remote_execution_pb2.Digest() @@ -1080,7 +991,7 @@ class CASCache(ArtifactCache): # Represents a single remote CAS cache. # -class _CASRemote(): +class CASRemote(): def __init__(self, spec): self.spec = spec self._initialized = False @@ -1125,7 +1036,7 @@ class _CASRemote(): certificate_chain=client_cert_bytes) self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) else: - raise ArtifactError("Unsupported URL: {}".format(self.spec.url)) + raise CASError("Unsupported URL: {}".format(self.spec.url)) self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) self.cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel) @@ -1203,10 +1114,10 @@ class _CASBatchRead(): for response in batch_response.responses: if response.status.code != code_pb2.OK: - raise ArtifactError("Failed to download blob {}: {}".format( + raise CASError("Failed to download blob {}: {}".format( response.digest.hash, response.status.code)) if response.digest.size_bytes != len(response.data): - raise ArtifactError("Failed to download blob {}: expected {} bytes, received {} bytes".format( + raise CASError("Failed to download blob {}: expected {} bytes, received {} bytes".format( response.digest.hash, response.digest.size_bytes, len(response.data))) yield (response.digest, response.data) @@ -1248,7 +1159,7 @@ class _CASBatchUpdate(): for response in batch_response.responses: if response.status.code != code_pb2.OK: - raise ArtifactError("Failed to upload blob {}: {}".format( + raise CASError("Failed to upload blob {}: {}".format( response.digest.hash, response.status.code)) diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py index 31b05ce0f..ee84c4943 100644 --- a/buildstream/_artifactcache/casserver.py +++ b/buildstream/_artifactcache/casserver.py @@ -32,8 +32,9 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc -from .._exceptions import ArtifactError -from .._context import Context +from .._exceptions import CASError + +from .cascache import CASCache # The default limit for gRPC messages is 4 MiB. @@ -55,26 +56,23 @@ class ArtifactTooLargeException(Exception): # enable_push (bool): Whether to allow blob uploads and artifact updates # def create_server(repo, *, enable_push): - context = Context() - context.artifactdir = os.path.abspath(repo) - - artifactcache = context.artifactcache + cas = CASCache(os.path.abspath(repo)) # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 server = grpc.server(futures.ThreadPoolExecutor(max_workers)) bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(artifactcache, enable_push=enable_push), server) + _ByteStreamServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(artifactcache, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( _CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(artifactcache, enable_push=enable_push), server) + _ReferenceStorageServicer(cas, enable_push=enable_push), server) return server @@ -333,7 +331,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): response.digest.hash = tree.hash response.digest.size_bytes = tree.size_bytes - except ArtifactError: + except CASError: context.set_code(grpc.StatusCode.NOT_FOUND) return response @@ -437,7 +435,7 @@ def _clean_up_cache(cas, object_size): return 0 # obtain a list of LRP artifacts - LRP_artifacts = cas.list_artifacts() + LRP_artifacts = cas.list_refs() removed_size = 0 # in bytes while object_size - removed_size > free_disk_space: diff --git a/buildstream/_context.py b/buildstream/_context.py index e3c290b7b..876b74712 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -31,7 +31,6 @@ from ._exceptions import LoadError, LoadErrorReason, BstError from ._message import Message, MessageType from ._profile import Topics, profile_start, profile_end from ._artifactcache import ArtifactCache -from ._artifactcache.cascache import CASCache from ._workspaces import Workspaces from .plugin import _plugin_lookup @@ -233,7 +232,7 @@ class Context(): @property def artifactcache(self): if not self._artifactcache: - self._artifactcache = CASCache(self) + self._artifactcache = ArtifactCache(self) return self._artifactcache diff --git a/buildstream/_elementfactory.py b/buildstream/_elementfactory.py index 00847e66f..d6591bf4c 100644 --- a/buildstream/_elementfactory.py +++ b/buildstream/_elementfactory.py @@ -47,7 +47,6 @@ class ElementFactory(PluginContext): # Args: # context (object): The Context object for processing # project (object): The project object - # artifacts (ArtifactCache): The artifact cache # meta (object): The loaded MetaElement # # Returns: A newly created Element object of the appropriate kind @@ -56,9 +55,9 @@ class ElementFactory(PluginContext): # PluginError (if the kind lookup failed) # LoadError (if the element itself took issue with the config) # - def create(self, context, project, artifacts, meta): + def create(self, context, project, meta): element_type, default_config = self.lookup(meta.kind) - element = element_type(context, project, artifacts, meta, default_config) + element = element_type(context, project, meta, default_config) version = self._format_versions.get(meta.kind, 0) self._assert_plugin_format(element, version) return element diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index a1c26d38c..ba0b9fabb 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -90,6 +90,7 @@ class ErrorDomain(Enum): APP = 12 STREAM = 13 VIRTUAL_FS = 14 + CAS = 15 # BstError is an internal base exception class for BuildSream @@ -274,6 +275,15 @@ class ArtifactError(BstError): super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason, temporary=True) +# CASError +# +# Raised when errors are encountered in the CAS +# +class CASError(BstError): + def __init__(self, message, *, detail=None, reason=None, temporary=False): + super().__init__(message, detail=detail, domain=ErrorDomain.CAS, reason=reason, temporary=True) + + # PipelineError # # Raised from pipeline operations diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index 8a81a71c1..22600b9e9 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -537,7 +537,7 @@ class Loader(): raise LoadError(LoadErrorReason.INVALID_DATA, "{}: Expected junction but element kind is {}".format(filename, meta_element.kind)) - element = Element._new_from_meta(meta_element, self._context.artifactcache) + element = Element._new_from_meta(meta_element) element._preflight() sources = list(element.sources()) diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 1f75b2e9e..3baeaa107 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -106,7 +106,7 @@ class Pipeline(): profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) - elements = self._project.load_elements(targets, self._artifacts, + elements = self._project.load_elements(targets, rewritable=rewritable, fetch_subprojects=fetch_subprojects) diff --git a/buildstream/_project.py b/buildstream/_project.py index 83aa1f47e..5280cf667 100644 --- a/buildstream/_project.py +++ b/buildstream/_project.py @@ -224,18 +224,17 @@ class Project(): # Instantiate and return an element # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The loaded MetaElement # first_pass (bool): Whether to use first pass configuration (for junctions) # # Returns: # (Element): A newly created Element object of the appropriate kind # - def create_element(self, artifacts, meta, *, first_pass=False): + def create_element(self, meta, *, first_pass=False): if first_pass: - return self.first_pass_config.element_factory.create(self._context, self, artifacts, meta) + return self.first_pass_config.element_factory.create(self._context, self, meta) else: - return self.config.element_factory.create(self._context, self, artifacts, meta) + return self.config.element_factory.create(self._context, self, meta) # create_source() # @@ -305,7 +304,6 @@ class Project(): # # Args: # targets (list): Target names - # artifacts (ArtifactCache): Artifact cache # rewritable (bool): Whether the loaded files should be rewritable # this is a bit more expensive due to deep copies # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the @@ -314,7 +312,7 @@ class Project(): # Returns: # (list): A list of loaded Element # - def load_elements(self, targets, artifacts, *, + def load_elements(self, targets, *, rewritable=False, fetch_subprojects=False): with self._context.timed_activity("Loading elements", silent_nested=True): meta_elements = self.loader.load(targets, rewritable=rewritable, @@ -323,7 +321,7 @@ class Project(): with self._context.timed_activity("Resolving elements"): elements = [ - Element._new_from_meta(meta, artifacts) + Element._new_from_meta(meta) for meta in meta_elements ] diff --git a/buildstream/element.py b/buildstream/element.py index 5de0253c5..8ec06090c 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -174,7 +174,7 @@ class Element(Plugin): *Since: 1.4* """ - def __init__(self, context, project, artifacts, meta, plugin_conf): + def __init__(self, context, project, meta, plugin_conf): self.__cache_key_dict = None # Dict for cache key calculation self.__cache_key = None # Our cached cache key @@ -199,7 +199,7 @@ class Element(Plugin): self.__sources = [] # List of Sources self.__weak_cache_key = None # Our cached weak cache key self.__strict_cache_key = None # Our cached cache key for strict builds - self.__artifacts = artifacts # Artifact cache + self.__artifacts = context.artifactcache # Artifact cache self.__consistency = Consistency.INCONSISTENT # Cached overall consistency state self.__strong_cached = None # Whether we have a cached artifact self.__weak_cached = None # Whether we have a cached artifact @@ -872,14 +872,13 @@ class Element(Plugin): # and its dependencies from a meta element. # # Args: - # artifacts (ArtifactCache): The artifact cache # meta (MetaElement): The meta element # # Returns: # (Element): A newly created Element instance # @classmethod - def _new_from_meta(cls, meta, artifacts): + def _new_from_meta(cls, meta): if not meta.first_pass: meta.project.ensure_fully_loaded() @@ -887,7 +886,7 @@ class Element(Plugin): if meta in cls.__instantiated_elements: return cls.__instantiated_elements[meta] - element = meta.project.create_element(artifacts, meta, first_pass=meta.first_pass) + element = meta.project.create_element(meta, first_pass=meta.first_pass) cls.__instantiated_elements[meta] = element # Instantiate sources @@ -904,10 +903,10 @@ class Element(Plugin): # Instantiate dependencies for meta_dep in meta.dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__runtime_dependencies.append(dependency) for meta_dep in meta.build_dependencies: - dependency = Element._new_from_meta(meta_dep, artifacts) + dependency = Element._new_from_meta(meta_dep) element.__build_dependencies.append(dependency) return element @@ -2057,7 +2056,7 @@ class Element(Plugin): 'sources': [s._get_unique_key(workspace is None) for s in self.__sources], 'workspace': '' if workspace is None else workspace.get_key(self._get_project()), 'public': self.__public, - 'cache': type(self.__artifacts).__name__ + 'cache': 'CASCache' } self.__cache_key_dict['fatal-warnings'] = sorted(project._fatal_warnings) diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py index 07fd206ed..fa5ec823b 100644 --- a/buildstream/storage/_casbaseddirectory.py +++ b/buildstream/storage/_casbaseddirectory.py @@ -79,7 +79,7 @@ class CasBasedDirectory(Directory): self.filename = filename self.common_name = common_name self.pb2_directory = remote_execution_pb2.Directory() - self.cas_cache = context.artifactcache + self.cas_cache = context.artifactcache.cas if ref: with open(self.cas_cache.objpath(ref), 'rb') as f: self.pb2_directory.ParseFromString(f.read()) diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index e76dc5ca7..dde451a8c 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -90,7 +90,7 @@ def test_pull(cli, tmpdir, datafiles): cas = context.artifactcache # Assert that the element's artifact is **not** cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') assert not cas.contains(element, element_key) @@ -132,7 +132,7 @@ def _test_pull(user_config_file, project_dir, artifact_dir, cas = context.artifactcache # Load the target element - element = project.load_elements([element_name], cas)[0] + element = project.load_elements([element_name])[0] # Manually setup the CAS remote cas.setup_remotes(use_config=True) @@ -190,15 +190,16 @@ def test_pull_tree(cli, tmpdir, datafiles): # Load the project and CAS cache project = Project(project_dir, context) project.ensure_fully_loaded() - cas = context.artifactcache + artifactcache = context.artifactcache + cas = artifactcache.cas # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') - assert cas.contains(element, element_key) + assert artifactcache.contains(element, element_key) # Retrieve the Directory object from the cached artifact - artifact_ref = cas.get_artifact_fullname(element, element_key) + artifact_ref = artifactcache.get_artifact_fullname(element, element_key) artifact_digest = cas.resolve_ref(artifact_ref) queue = multiprocessing.Queue() @@ -268,12 +269,13 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest project.ensure_fully_loaded() # Create a local CAS cache handle - cas = context.artifactcache + artifactcache = context.artifactcache + cas = artifactcache.cas # Manually setup the CAS remote - cas.setup_remotes(use_config=True) + artifactcache.setup_remotes(use_config=True) - if cas.has_push_remotes(): + if artifactcache.has_push_remotes(): directory = remote_execution_pb2.Directory() with open(cas.objpath(artifact_digest), 'rb') as f: @@ -284,7 +286,7 @@ def _test_push_tree(user_config_file, project_dir, artifact_dir, artifact_digest tree_maker(cas, tree, directory) # Push the Tree as a regular message - tree_digest = cas.push_message(project, tree) + tree_digest = artifactcache.push_message(project, tree) queue.put((tree_digest.hash, tree_digest.size_bytes)) else: diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index c95aac3ef..be6293cf4 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -69,7 +69,7 @@ def test_push(cli, tmpdir, datafiles): cas = context.artifactcache # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') assert cas.contains(element, element_key) @@ -111,7 +111,7 @@ def _test_push(user_config_file, project_dir, artifact_dir, cas = context.artifactcache # Load the target element - element = project.load_elements([element_name], cas)[0] + element = project.load_elements([element_name])[0] # Manually setup the CAS remote cas.setup_remotes(use_config=True) @@ -165,20 +165,21 @@ def test_push_directory(cli, tmpdir, datafiles): # Load the project and CAS cache project = Project(project_dir, context) project.ensure_fully_loaded() - cas = context.artifactcache + artifactcache = context.artifactcache + cas = artifactcache.cas # Assert that the element's artifact is cached - element = project.load_elements(['target.bst'], cas)[0] + element = project.load_elements(['target.bst'])[0] element_key = cli.get_element_key(project_dir, 'target.bst') - assert cas.contains(element, element_key) + assert artifactcache.contains(element, element_key) # Manually setup the CAS remote - cas.setup_remotes(use_config=True) - cas.initialize_remotes() - assert cas.has_push_remotes(element=element) + artifactcache.setup_remotes(use_config=True) + artifactcache.initialize_remotes() + assert artifactcache.has_push_remotes(element=element) # Recreate the CasBasedDirectory object from the cached artifact - artifact_ref = cas.get_artifact_fullname(element, element_key) + artifact_ref = artifactcache.get_artifact_fullname(element, element_key) artifact_digest = cas.resolve_ref(artifact_ref) queue = multiprocessing.Queue() diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index a8af59905..02f76de90 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -13,7 +13,7 @@ import pytest_cov from buildstream import _yaml from buildstream._artifactcache.casserver import create_server from buildstream._context import Context -from buildstream._exceptions import ArtifactError +from buildstream._exceptions import CASError from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 @@ -48,7 +48,7 @@ class ArtifactShare(): context = Context() context.artifactdir = self.repodir - self.cas = context.artifactcache + self.cas = context.artifactcache.cas self.total_space = total_space self.free_space = free_space @@ -135,7 +135,7 @@ class ArtifactShare(): try: tree = self.cas.resolve_ref(artifact_key) return True - except ArtifactError: + except CASError: return False # close(): |