summaryrefslogtreecommitdiff
path: root/buildstream/_artifactcache.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_artifactcache.py')
-rw-r--r--buildstream/_artifactcache.py336
1 files changed, 244 insertions, 92 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py
index 5b0ccacc4..091b44dda 100644
--- a/buildstream/_artifactcache.py
+++ b/buildstream/_artifactcache.py
@@ -17,13 +17,18 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
+import os
+import grpc
+
from ._basecache import BaseCache
from .types import _KeyStrength
-from ._exceptions import ArtifactError, CASCacheError, CASError
+from ._exceptions import ArtifactError, CASError, CASCacheError
+from ._protos.buildstream.v2 import artifact_pb2, artifact_pb2_grpc
from ._cas import CASRemoteSpec
from .storage._casbaseddirectory import CasBasedDirectory
-from .storage.directory import VirtualDirectoryError
+from ._artifact import Artifact
+from . import utils
# An ArtifactCacheSpec holds the user configuration for a single remote
@@ -55,8 +60,15 @@ class ArtifactCache(BaseCache):
self._required_elements = set() # The elements required for this session
- self.casquota.add_ref_callbacks(self.required_artifacts)
- self.casquota.add_remove_callbacks((lambda x: not x.startswith('@'), self.remove))
+ # create artifact directory
+ self.artifactdir = context.artifactdir
+ os.makedirs(self.artifactdir, exist_ok=True)
+
+ self.casquota.add_remove_callbacks(self.unrequired_artifacts, self.remove)
+ self.casquota.add_list_refs_callback(self.list_artifacts)
+
+ self.cas.add_reachable_directories_callback(self._reachable_directories)
+ self.cas.add_reachable_digests_callback(self._reachable_digests)
# mark_required_elements():
#
@@ -92,13 +104,34 @@ class ArtifactCache(BaseCache):
weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
for key in (strong_key, weak_key):
if key:
- try:
- ref = element.get_artifact_name(key)
+ ref = element.get_artifact_name(key)
- self.cas.update_mtime(ref)
- except CASError:
+ try:
+ self.update_mtime(ref)
+ except ArtifactError:
pass
+ def update_mtime(self, ref):
+ try:
+ os.utime(os.path.join(self.artifactdir, ref))
+ except FileNotFoundError as e:
+ raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e
+
+ # unrequired_artifacts()
+ #
+ # Returns iterator over artifacts that are not required in the build plan
+ #
+ # Returns:
+ # (iter): Iterator over tuples of (float, str) where float is the time
+ # and str is the artifact ref
+ #
+ def unrequired_artifacts(self):
+ required_artifacts = set(map(lambda x: x.get_artifact_name(),
+ self._required_elements))
+ for (mtime, artifact) in self._list_refs_mtimes(self.artifactdir):
+ if artifact not in required_artifacts:
+ yield (mtime, artifact)
+
def required_artifacts(self):
# Build a set of the cache keys which are required
# based on the required elements at cleanup time
@@ -147,24 +180,7 @@ class ArtifactCache(BaseCache):
def contains(self, element, key):
ref = element.get_artifact_name(key)
- return self.cas.contains(ref)
-
- # contains_subdir_artifact():
- #
- # Check whether an artifact element contains a digest for a subdir
- # which is populated in the cache, i.e non dangling.
- #
- # Args:
- # element (Element): The Element to check
- # key (str): The cache key to use
- # subdir (str): The subdir to check
- # with_files (bool): Whether to check files as well
- #
- # Returns: True if the subdir exists & is populated in the cache, False otherwise
- #
- def contains_subdir_artifact(self, element, key, subdir, *, with_files=True):
- ref = element.get_artifact_name(key)
- return self.cas.contains_subdir_artifact(ref, subdir, with_files=with_files)
+ return os.path.exists(os.path.join(self.artifactdir, ref))
# list_artifacts():
#
@@ -177,9 +193,7 @@ class ArtifactCache(BaseCache):
# ([str]) - A list of artifact names as generated in LRU order
#
def list_artifacts(self, *, glob=None):
- return list(filter(
- lambda x: not x.startswith('@'),
- self.cas.list_refs(glob=glob)))
+ return [ref for _, ref in sorted(list(self._list_refs_mtimes(self.artifactdir, glob_expr=glob)))]
# remove():
#
@@ -196,7 +210,10 @@ class ArtifactCache(BaseCache):
# (int): The amount of space recovered in the cache, in bytes
#
def remove(self, ref, *, defer_prune=False):
- return self.cas.remove(ref, defer_prune=defer_prune)
+ try:
+ return self.cas.remove(ref, basedir=self.artifactdir, defer_prune=defer_prune)
+ except CASCacheError as e:
+ raise ArtifactError("{}".format(e)) from e
# prune():
#
@@ -205,63 +222,31 @@ class ArtifactCache(BaseCache):
def prune(self):
return self.cas.prune()
- # get_artifact_directory():
- #
- # Get virtual directory for cached artifact of the specified Element.
- #
- # Assumes artifact has previously been fetched or committed.
- #
- # Args:
- # element (Element): The Element to extract
- # key (str): The cache key to use
- #
- # Raises:
- # ArtifactError: In cases there was an OSError, or if the artifact
- # did not exist.
- #
- # Returns: virtual directory object
- #
- def get_artifact_directory(self, element, key):
- ref = element.get_artifact_name(key)
- try:
- digest = self.cas.resolve_ref(ref, update_mtime=True)
- return CasBasedDirectory(self.cas, digest=digest)
- except (CASCacheError, VirtualDirectoryError) as e:
- raise ArtifactError('Directory not in local cache: {}'.format(e)) from e
-
- # commit():
- #
- # Commit built artifact to cache.
- #
- # Args:
- # element (Element): The Element commit an artifact for
- # content (Directory): The element's content directory
- # keys (list): The cache keys to use
- #
- def commit(self, element, content, keys):
- refs = [element.get_artifact_name(key) for key in keys]
-
- tree = content._get_digest()
-
- for ref in refs:
- self.cas.set_ref(ref, tree)
-
# diff():
#
# Return a list of files that have been added or modified between
- # the artifacts described by key_a and key_b.
+ # the artifacts described by key_a and key_b. This expects the
+ # provided keys to be strong cache keys
#
# Args:
# element (Element): The element whose artifacts to compare
- # key_a (str): The first artifact key
- # key_b (str): The second artifact key
- # subdir (str): A subdirectory to limit the comparison to
+ # key_a (str): The first artifact strong key
+ # key_b (str): The second artifact strong key
#
- def diff(self, element, key_a, key_b, *, subdir=None):
- ref_a = element.get_artifact_name(key_a)
- ref_b = element.get_artifact_name(key_b)
+ def diff(self, element, key_a, key_b):
+ context = self.context
+ artifact_a = Artifact(element, context, strong_key=key_a)
+ artifact_b = Artifact(element, context, strong_key=key_b)
+ digest_a = artifact_a._get_proto().files
+ digest_b = artifact_b._get_proto().files
- return self.cas.diff(ref_a, ref_b, subdir=subdir)
+ added = []
+ removed = []
+ modified = []
+
+ self.cas.diff_trees(digest_a, digest_b, added=added, removed=removed, modified=modified)
+
+ return modified, removed, added
# push():
#
@@ -269,7 +254,7 @@ class ArtifactCache(BaseCache):
#
# Args:
# element (Element): The Element whose artifact is to be pushed
- # keys (list): The cache keys to use
+ # artifact (Artifact): The artifact being pushed
#
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
@@ -277,9 +262,7 @@ class ArtifactCache(BaseCache):
# Raises:
# (ArtifactError): if there was an error
#
- def push(self, element, keys):
- refs = [element.get_artifact_name(key) for key in list(keys)]
-
+ def push(self, element, artifact):
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]
@@ -291,7 +274,7 @@ class ArtifactCache(BaseCache):
display_key = element._get_brief_display_key()
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
- if self.cas.push(refs, remote):
+ if self._push_artifact(element, artifact, remote):
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
pushed = True
else:
@@ -308,24 +291,21 @@ class ArtifactCache(BaseCache):
# Args:
# element (Element): The Element whose artifact is to be fetched
# key (str): The cache key to use
- # progress (callable): The progress callback, if any
- # subdir (str): The optional specific subdir to pull
- # excluded_subdirs (list): The optional list of subdirs to not pull
+ # pull_buildtrees (bool): Whether to pull buildtrees or not
#
# Returns:
# (bool): True if pull was successful, False if artifact was not available
#
- def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
- ref = element.get_artifact_name(key)
+ def pull(self, element, key, *, pull_buildtrees=False):
display_key = key[:self.context.log_key_length]
-
project = element._get_project()
for remote in self._remotes[project]:
+ remote.init()
try:
element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
- if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
+ if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees):
element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
# no need to pull from additional remotes
return True
@@ -399,7 +379,9 @@ class ArtifactCache(BaseCache):
oldref = element.get_artifact_name(oldkey)
newref = element.get_artifact_name(newkey)
- self.cas.link_ref(oldref, newref)
+ if not os.path.exists(os.path.join(self.artifactdir, newref)):
+ os.link(os.path.join(self.artifactdir, oldref),
+ os.path.join(self.artifactdir, newref))
# get_artifact_logs():
#
@@ -463,3 +445,173 @@ class ArtifactCache(BaseCache):
remote_missing_blobs_set.update(remote_missing_blobs)
return list(remote_missing_blobs_set)
+
+ ################################################
+ # Local Private Methods #
+ ################################################
+
+ # _reachable_directories()
+ #
+ # Returns:
+ # (iter): Iterator over directories digests available from artifacts.
+ #
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
+
+ if str(artifact.files):
+ yield artifact.files
+
+ if str(artifact.buildtree):
+ yield artifact.buildtree
+
+ # _reachable_digests()
+ #
+ # Returns:
+ # (iter): Iterator over single file digests in artifacts
+ #
+ def _reachable_digests(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
+
+ if str(artifact.public_data):
+ yield artifact.public_data
+
+ for log_file in artifact.logs:
+ yield log_file.digest
+
+ # _push_artifact()
+ #
+ # Pushes relevant directories and then artifact proto to remote.
+ #
+ # Args:
+ # element (Element): The element
+ # artifact (Artifact): The related artifact being pushed
+ # remote (CASRemote): Remote to push to
+ #
+ # Returns:
+ # (bool): whether the push was successful
+ #
+ def _push_artifact(self, element, artifact, remote):
+
+ artifact_proto = artifact._get_proto()
+
+ keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
+
+ # Check whether the artifact is on the server
+ present = False
+ for key in keys:
+ get_artifact = artifact_pb2.GetArtifactRequest()
+ get_artifact.cache_key = element.get_artifact_name(key)
+ try:
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
+ artifact_service.GetArtifact(get_artifact)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Error checking artifact cache: {}"
+ .format(e.details()))
+ else:
+ present = True
+ if present:
+ return False
+
+ try:
+ self.cas._send_directory(remote, artifact_proto.files)
+
+ if str(artifact_proto.buildtree):
+ try:
+ self.cas._send_directory(remote, artifact_proto.buildtree)
+ except FileNotFoundError:
+ pass
+
+ digests = []
+ if str(artifact_proto.public_data):
+ digests.append(artifact_proto.public_data)
+
+ for log_file in artifact_proto.logs:
+ digests.append(log_file.digest)
+
+ self.cas.send_blobs(remote, digests)
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise ArtifactError("Failed to push artifact blobs: {}".format(e.details()))
+ return False
+
+ # finally need to send the artifact proto
+ for key in keys:
+ update_artifact = artifact_pb2.UpdateArtifactRequest()
+ update_artifact.cache_key = element.get_artifact_name(key)
+ update_artifact.artifact.CopyFrom(artifact_proto)
+
+ try:
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
+ artifact_service.UpdateArtifact(update_artifact)
+ except grpc.RpcError as e:
+ raise ArtifactError("Failed to push artifact: {}".format(e.details()))
+
+ return True
+
+ # _pull_artifact()
+ #
+ # Args:
+ # element (Element): element to pull
+ # key (str): specific key of element to pull
+ # remote (CASRemote): remote to pull from
+ # pull_buildtree (bool): whether to pull buildtrees or not
+ #
+ # Returns:
+ # (bool): whether the pull was successful
+ #
+ def _pull_artifact(self, element, key, remote, pull_buildtrees=False):
+
+ def __pull_digest(digest):
+ self.cas._fetch_directory(remote, digest)
+ required_blobs = self.cas.required_blobs_for_directory(digest)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ if missing_blobs:
+ self.cas.fetch_blobs(remote, missing_blobs)
+
+ request = artifact_pb2.GetArtifactRequest()
+ request.cache_key = element.get_artifact_name(key=key)
+ try:
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
+ artifact = artifact_service.GetArtifact(request)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
+ return False
+
+ try:
+ if str(artifact.files):
+ __pull_digest(artifact.files)
+
+ if pull_buildtrees and str(artifact.buildtree):
+ __pull_digest(artifact.buildtree)
+
+ digests = []
+ if str(artifact.public_data):
+ digests.append(artifact.public_data)
+
+ for log_digest in artifact.logs:
+ digests.append(log_digest.digest)
+
+ self.cas.fetch_blobs(remote, digests)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
+ return False
+
+ # Write the artifact proto to cache
+ artifact_path = os.path.join(self.artifactdir, request.cache_key)
+ os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
+ with open(artifact_path, 'w+b') as f:
+ f.write(artifact.SerializeToString())
+
+ return True