summaryrefslogtreecommitdiff
path: root/buildstream
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-05-15 11:44:45 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-05-15 11:44:45 +0000
commit7fb538647ccb9ed7b42a60f4b663b10c40dcc772 (patch)
tree2cb482318b76815cb096fc6d0ff43439ebc70a7b /buildstream
parent2f4023f3555f588aa8fc70494b08bf95b3db6463 (diff)
parent1de222e9e8f27dca71a9f078bc82885923c0bcba (diff)
downloadbuildstream-7fb538647ccb9ed7b42a60f4b663b10c40dcc772.tar.gz
Merge branch 'raoul/974-Artifact-Rework' into 'master'
Artifact as a Proto: rework Closes #974 See merge request BuildStream/buildstream!1292
Diffstat (limited to 'buildstream')
-rw-r--r--buildstream/_artifact.py311
-rw-r--r--buildstream/_artifactcache.py336
-rw-r--r--buildstream/_basecache.py31
-rw-r--r--buildstream/_cas/cascache.py389
-rw-r--r--buildstream/_cas/casserver.py23
-rw-r--r--buildstream/_context.py4
-rw-r--r--buildstream/_sourcecache.py45
-rw-r--r--buildstream/_stream.py4
-rw-r--r--buildstream/element.py130
-rw-r--r--buildstream/testing/runcli.py50
10 files changed, 745 insertions, 578 deletions
diff --git a/buildstream/_artifact.py b/buildstream/_artifact.py
index 6cf51ee2d..2240300c7 100644
--- a/buildstream/_artifact.py
+++ b/buildstream/_artifact.py
@@ -29,11 +29,11 @@ artifact composite interaction away from Element class
"""
import os
-import shutil
+import tempfile
+from ._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto
from . import _yaml
from . import utils
-from ._exceptions import ArtifactError
from .types import Scope
from .storage._casbaseddirectory import CasBasedDirectory
@@ -49,12 +49,18 @@ from .storage._casbaseddirectory import CasBasedDirectory
#
class Artifact():
+ version = 0
+
def __init__(self, element, context, *, strong_key=None, weak_key=None):
self._element = element
self._context = context
self._artifacts = context.artifactcache
self._cache_key = strong_key
self._weak_cache_key = weak_key
+ self._artifactdir = context.artifactdir
+ self._cas = context.get_cascache()
+ self._tmpdir = context.tmpdir
+ self._proto = None
self._metadata_keys = None # Strong and weak key tuple extracted from the artifact
self._metadata_dependencies = None # Dictionary of dependency strong keys from the artifact
@@ -69,7 +75,9 @@ class Artifact():
# (Directory): The virtual directory object
#
def get_files(self):
- return self._get_subdirectory("files")
+ files_digest = self._get_field_digest("files")
+
+ return CasBasedDirectory(self._cas, digest=files_digest)
# get_buildtree():
#
@@ -79,7 +87,9 @@ class Artifact():
# (Directory): The virtual directory object
#
def get_buildtree(self):
- return self._get_subdirectory("buildtree")
+ buildtree_digest = self._get_field_digest("buildtree")
+
+ return CasBasedDirectory(self._cas, digest=buildtree_digest)
# get_extract_key():
#
@@ -100,7 +110,6 @@ class Artifact():
# sandbox_build_dir (Directory): Virtual Directory object for the sandbox build-root
# collectvdir (Directory): Virtual Directoy object from within the sandbox for collection
# buildresult (tuple): bool, short desc and detailed desc of result
- # keys (list): list of keys for the artifact commit metadata
# publicdata (dict): dict of public data to commit to artifact metadata
#
# Returns:
@@ -110,80 +119,78 @@ class Artifact():
context = self._context
element = self._element
+ size = 0
- assemblevdir = CasBasedDirectory(cas_cache=self._artifacts.cas)
- logsvdir = assemblevdir.descend("logs", create=True)
- metavdir = assemblevdir.descend("meta", create=True)
+ filesvdir = None
+ buildtreevdir = None
- # Create artifact directory structure
- assembledir = os.path.join(rootdir, 'artifact')
- logsdir = os.path.join(assembledir, 'logs')
- metadir = os.path.join(assembledir, 'meta')
- os.mkdir(assembledir)
- os.mkdir(logsdir)
- os.mkdir(metadir)
+ artifact = ArtifactProto()
- if collectvdir is not None:
- filesvdir = assemblevdir.descend("files", create=True)
- filesvdir.import_files(collectvdir)
+ artifact.version = self.version
- if sandbox_build_dir:
- buildtreevdir = assemblevdir.descend("buildtree", create=True)
- buildtreevdir.import_files(sandbox_build_dir)
+ # Store result
+ artifact.build_success = buildresult[0]
+ artifact.build_error = buildresult[1]
+ artifact.build_error_details = "" if not buildresult[2] else buildresult[2]
- # Write some logs out to normal directories: logsdir and metadir
- # Copy build log
- log_filename = context.get_log_filename()
- element._build_log_path = os.path.join(logsdir, 'build.log')
- if log_filename:
- shutil.copyfile(log_filename, element._build_log_path)
+ # Store keys
+ artifact.strong_key = self._cache_key
+ artifact.weak_key = self._weak_cache_key
+
+ artifact.was_workspaced = bool(element._get_workspace())
+
+ # Store files
+ if collectvdir:
+ filesvdir = CasBasedDirectory(cas_cache=self._cas)
+ filesvdir.import_files(collectvdir)
+ artifact.files.CopyFrom(filesvdir._get_digest())
+ size += filesvdir.get_size()
# Store public data
- _yaml.dump(_yaml.node_sanitize(publicdata), os.path.join(metadir, 'public.yaml'))
+ with tempfile.NamedTemporaryFile(dir=self._tmpdir) as tmp:
+ _yaml.dump(_yaml.node_sanitize(publicdata), tmp.name)
+ public_data_digest = self._cas.add_object(path=tmp.name, link_directly=True)
+ artifact.public_data.CopyFrom(public_data_digest)
+ size += public_data_digest.size_bytes
+
+ # store build dependencies
+ for e in element.dependencies(Scope.BUILD):
+ new_build = artifact.build_deps.add()
+ new_build.element_name = e.name
+ new_build.cache_key = e._get_cache_key()
+ new_build.was_workspaced = bool(e._get_workspace())
+
+ # Store log file
+ log_filename = context.get_log_filename()
+ if log_filename:
+ digest = self._cas.add_object(path=log_filename)
+ element._build_log_path = self._cas.objpath(digest)
+ log = artifact.logs.add()
+ log.name = os.path.basename(log_filename)
+ log.digest.CopyFrom(digest)
+ size += log.digest.size_bytes
+
+ # Store build tree
+ if sandbox_build_dir:
+ buildtreevdir = CasBasedDirectory(cas_cache=self._cas)
+ buildtreevdir.import_files(sandbox_build_dir)
+ artifact.buildtree.CopyFrom(buildtreevdir._get_digest())
+ size += buildtreevdir.get_size()
- # Store result
- build_result_dict = {"success": buildresult[0], "description": buildresult[1]}
- if buildresult[2] is not None:
- build_result_dict["detail"] = buildresult[2]
- _yaml.dump(build_result_dict, os.path.join(metadir, 'build-result.yaml'))
-
- # Store keys.yaml
- _yaml.dump(_yaml.node_sanitize({
- 'strong': self._cache_key,
- 'weak': self._weak_cache_key,
- }), os.path.join(metadir, 'keys.yaml'))
-
- # Store dependencies.yaml
- _yaml.dump(_yaml.node_sanitize({
- e.name: e._get_cache_key() for e in element.dependencies(Scope.BUILD)
- }), os.path.join(metadir, 'dependencies.yaml'))
-
- # Store workspaced.yaml
- _yaml.dump(_yaml.node_sanitize({
- 'workspaced': bool(element._get_workspace())
- }), os.path.join(metadir, 'workspaced.yaml'))
-
- # Store workspaced-dependencies.yaml
- _yaml.dump(_yaml.node_sanitize({
- 'workspaced-dependencies': [
- e.name for e in element.dependencies(Scope.BUILD)
- if e._get_workspace()
- ]
- }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
-
- metavdir.import_files(metadir)
- logsvdir.import_files(logsdir)
-
- artifact_size = assemblevdir.get_size()
+ os.makedirs(os.path.dirname(os.path.join(
+ self._artifactdir, element.get_artifact_name())), exist_ok=True)
keys = utils._deduplicate([self._cache_key, self._weak_cache_key])
- self._artifacts.commit(element, assemblevdir, keys)
+ for key in keys:
+ path = os.path.join(self._artifactdir, element.get_artifact_name(key=key))
+ with open(path, mode='w+b') as f:
+ f.write(artifact.SerializeToString())
- return artifact_size
+ return size
# cached_buildtree()
#
# Check if artifact is cached with expected buildtree. A
- # buildtree will not be present if the res tof the partial artifact
+ # buildtree will not be present if the rest of the partial artifact
# is not cached.
#
# Returns:
@@ -193,14 +200,12 @@ class Artifact():
#
def cached_buildtree(self):
- element = self._element
-
- key = self.get_extract_key()
- if not self._artifacts.contains_subdir_artifact(element, key, 'buildtree'):
+ buildtree_digest = self._get_field_digest("buildtree")
+ if buildtree_digest:
+ return self._cas.contains_directory(buildtree_digest, with_files=True)
+ else:
return False
- return True
-
# buildtree_exists()
#
# Check if artifact was created with a buildtree. This does not check
@@ -211,8 +216,8 @@ class Artifact():
#
def buildtree_exists(self):
- artifact_vdir = self._get_directory()
- return artifact_vdir._exists('buildtree')
+ artifact = self._get_proto()
+ return bool(str(artifact.buildtree))
# load_public_data():
#
@@ -224,8 +229,8 @@ class Artifact():
def load_public_data(self):
# Load the public data from the artifact
- meta_vdir = self._get_subdirectory('meta')
- meta_file = meta_vdir._objpath('public.yaml')
+ artifact = self._get_proto()
+ meta_file = self._cas.objpath(artifact.public_data)
data = _yaml.load(meta_file, shortname='public.yaml')
return data
@@ -241,20 +246,10 @@ class Artifact():
#
def load_build_result(self):
- meta_vdir = self._get_subdirectory('meta')
-
- meta_file = meta_vdir._objpath('build-result.yaml')
- if not os.path.exists(meta_file):
- build_result = (True, "succeeded", None)
- return build_result
-
- data = _yaml.load(meta_file, shortname='build-result.yaml')
-
- success = _yaml.node_get(data, bool, 'success')
- description = _yaml.node_get(data, str, 'description', default_value=None)
- detail = _yaml.node_get(data, str, 'detail', default_value=None)
-
- build_result = (success, description, detail)
+ artifact = self._get_proto()
+ build_result = (artifact.build_success,
+ artifact.build_error,
+ artifact.build_error_details)
return build_result
@@ -271,14 +266,11 @@ class Artifact():
if self._metadata_keys is not None:
return self._metadata_keys
- # Extract the metadata dir
- meta_vdir = self._get_subdirectory('meta')
+ # Extract proto
+ artifact = self._get_proto()
- # Parse the expensive yaml now and cache the result
- meta_file = meta_vdir._objpath('keys.yaml')
- meta = _yaml.load(meta_file, shortname='keys.yaml')
- strong_key = _yaml.node_get(meta, str, 'strong')
- weak_key = _yaml.node_get(meta, str, 'weak')
+ strong_key = artifact.strong_key
+ weak_key = artifact.weak_key
self._metadata_keys = (strong_key, weak_key)
@@ -296,14 +288,10 @@ class Artifact():
if self._metadata_dependencies is not None:
return self._metadata_dependencies
- # Extract the metadata dir
- meta_vdir = self._get_subdirectory('meta')
-
- # Parse the expensive yaml now and cache the result
- meta_file = meta_vdir._objpath('dependencies.yaml')
- meta = _yaml.load(meta_file, shortname='dependencies.yaml')
+ # Extract proto
+ artifact = self._get_proto()
- self._metadata_dependencies = meta
+ self._metadata_dependencies = {dep.element_name: dep.cache_key for dep in artifact.build_deps}
return self._metadata_dependencies
@@ -319,14 +307,10 @@ class Artifact():
if self._metadata_workspaced is not None:
return self._metadata_workspaced
- # Extract the metadata dir
- meta_vdir = self._get_subdirectory('meta')
-
- # Parse the expensive yaml now and cache the result
- meta_file = meta_vdir._objpath('workspaced.yaml')
- meta = _yaml.load(meta_file, shortname='workspaced.yaml')
+ # Extract proto
+ artifact = self._get_proto()
- self._metadata_workspaced = _yaml.node_get(meta, bool, 'workspaced')
+ self._metadata_workspaced = artifact.was_workspaced
return self._metadata_workspaced
@@ -342,15 +326,11 @@ class Artifact():
if self._metadata_workspaced_dependencies is not None:
return self._metadata_workspaced_dependencies
- # Extract the metadata dir
- meta_vdir = self._get_subdirectory('meta')
+ # Extract proto
+ artifact = self._get_proto()
- # Parse the expensive yaml now and cache the result
- meta_file = meta_vdir._objpath('workspaced-dependencies.yaml')
- meta = _yaml.load(meta_file, shortname='workspaced-dependencies.yaml')
-
- self._metadata_workspaced_dependencies = _yaml.node_sanitize(_yaml.node_get(meta, list,
- 'workspaced-dependencies'))
+ self._metadata_workspaced_dependencies = [dep.element_name for dep in artifact.build_deps
+ if dep.was_workspaced]
return self._metadata_workspaced_dependencies
@@ -369,30 +349,21 @@ class Artifact():
def cached(self):
context = self._context
- try:
- vdir = self._get_directory()
- except ArtifactError:
- # Either ref or top-level artifact directory missing
- return False
+ artifact = self._get_proto()
- # Check whether all metadata is available
- metadigest = vdir._get_child_digest('meta')
- if not self._artifacts.cas.contains_directory(metadigest, with_files=True):
+ if not artifact:
return False
- # Additional checks only relevant if artifact was created with 'files' subdirectory
- if vdir._exists('files'):
- # Determine whether directories are required
- require_directories = context.require_artifact_directories
- # Determine whether file contents are required as well
- require_files = context.require_artifact_files or self._element._artifact_files_required()
+ # Determine whether directories are required
+ require_directories = context.require_artifact_directories
+ # Determine whether file contents are required as well
+ require_files = (context.require_artifact_files or
+ self._element._artifact_files_required())
- filesdigest = vdir._get_child_digest('files')
-
- # Check whether 'files' subdirectory is available, with or without file contents
- if (require_directories and
- not self._artifacts.cas.contains_directory(filesdigest, with_files=require_files)):
- return False
+ # Check whether 'files' subdirectory is available, with or without file contents
+ if (require_directories and str(artifact.files) and
+ not self._cas.contains_directory(artifact.files, with_files=require_files)):
+ return False
return True
@@ -408,46 +379,50 @@ class Artifact():
if not self._element._cached():
return False
- log_vdir = self._get_subdirectory('logs')
+ artifact = self._get_proto()
- logsdigest = log_vdir._get_digest()
- return self._artifacts.cas.contains_directory(logsdigest, with_files=True)
+ for logfile in artifact.logs:
+ if not self._cas.contains(logfile.digest.hash):
+ return False
- # _get_directory():
- #
- # Get a virtual directory for the artifact contents
- #
- # Args:
- # key (str): The key for the artifact to extract,
- # or None for the default key
+ return True
+
+ # _get_proto()
#
# Returns:
- # (Directory): The virtual directory object
+ # (Artifact): Artifact proto
#
- def _get_directory(self, key=None):
+ def _get_proto(self):
+ # Check if we've already cached the proto object
+ if self._proto is not None:
+ return self._proto
- element = self._element
+ key = self.get_extract_key()
- if key is None:
- key = self.get_extract_key()
+ proto_path = os.path.join(self._artifactdir,
+ self._element.get_artifact_name(key=key))
+ artifact = ArtifactProto()
+ try:
+ with open(proto_path, mode='r+b') as f:
+ artifact.ParseFromString(f.read())
+ except FileNotFoundError:
+ return None
- return self._artifacts.get_artifact_directory(element, key)
+ os.utime(proto_path)
+ # Cache the proto object
+ self._proto = artifact
- # _get_subdirectory():
- #
- # Get a virtual directory for the artifact subdir contents
- #
- # Args:
- # subdir (str): The specific artifact subdir
- # key (str): The key for the artifact to extract,
- # or None for the default key
+ return self._proto
+
+ # _get_artifact_field()
#
# Returns:
- # (Directory): The virtual subdirectory object
+ # (Digest): Digest of field specified
#
- def _get_subdirectory(self, subdir, key=None):
-
- artifact_vdir = self._get_directory(key)
- sub_vdir = artifact_vdir.descend(subdir)
+ def _get_field_digest(self, field):
+ artifact_proto = self._get_proto()
+ digest = getattr(artifact_proto, field)
+ if not str(digest):
+ return None
- return sub_vdir
+ return digest
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py
index 5b0ccacc4..091b44dda 100644
--- a/buildstream/_artifactcache.py
+++ b/buildstream/_artifactcache.py
@@ -17,13 +17,18 @@
# Authors:
# Tristan Maat <tristan.maat@codethink.co.uk>
+import os
+import grpc
+
from ._basecache import BaseCache
from .types import _KeyStrength
-from ._exceptions import ArtifactError, CASCacheError, CASError
+from ._exceptions import ArtifactError, CASError, CASCacheError
+from ._protos.buildstream.v2 import artifact_pb2, artifact_pb2_grpc
from ._cas import CASRemoteSpec
from .storage._casbaseddirectory import CasBasedDirectory
-from .storage.directory import VirtualDirectoryError
+from ._artifact import Artifact
+from . import utils
# An ArtifactCacheSpec holds the user configuration for a single remote
@@ -55,8 +60,15 @@ class ArtifactCache(BaseCache):
self._required_elements = set() # The elements required for this session
- self.casquota.add_ref_callbacks(self.required_artifacts)
- self.casquota.add_remove_callbacks((lambda x: not x.startswith('@'), self.remove))
+ # create artifact directory
+ self.artifactdir = context.artifactdir
+ os.makedirs(self.artifactdir, exist_ok=True)
+
+ self.casquota.add_remove_callbacks(self.unrequired_artifacts, self.remove)
+ self.casquota.add_list_refs_callback(self.list_artifacts)
+
+ self.cas.add_reachable_directories_callback(self._reachable_directories)
+ self.cas.add_reachable_digests_callback(self._reachable_digests)
# mark_required_elements():
#
@@ -92,13 +104,34 @@ class ArtifactCache(BaseCache):
weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
for key in (strong_key, weak_key):
if key:
- try:
- ref = element.get_artifact_name(key)
+ ref = element.get_artifact_name(key)
- self.cas.update_mtime(ref)
- except CASError:
+ try:
+ self.update_mtime(ref)
+ except ArtifactError:
pass
+ def update_mtime(self, ref):
+ try:
+ os.utime(os.path.join(self.artifactdir, ref))
+ except FileNotFoundError as e:
+ raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e
+
+ # unrequired_artifacts()
+ #
+ # Returns iterator over artifacts that are not required in the build plan
+ #
+ # Returns:
+ # (iter): Iterator over tuples of (float, str) where float is the time
+ # and str is the artifact ref
+ #
+ def unrequired_artifacts(self):
+ required_artifacts = set(map(lambda x: x.get_artifact_name(),
+ self._required_elements))
+ for (mtime, artifact) in self._list_refs_mtimes(self.artifactdir):
+ if artifact not in required_artifacts:
+ yield (mtime, artifact)
+
def required_artifacts(self):
# Build a set of the cache keys which are required
# based on the required elements at cleanup time
@@ -147,24 +180,7 @@ class ArtifactCache(BaseCache):
def contains(self, element, key):
ref = element.get_artifact_name(key)
- return self.cas.contains(ref)
-
- # contains_subdir_artifact():
- #
- # Check whether an artifact element contains a digest for a subdir
- # which is populated in the cache, i.e non dangling.
- #
- # Args:
- # element (Element): The Element to check
- # key (str): The cache key to use
- # subdir (str): The subdir to check
- # with_files (bool): Whether to check files as well
- #
- # Returns: True if the subdir exists & is populated in the cache, False otherwise
- #
- def contains_subdir_artifact(self, element, key, subdir, *, with_files=True):
- ref = element.get_artifact_name(key)
- return self.cas.contains_subdir_artifact(ref, subdir, with_files=with_files)
+ return os.path.exists(os.path.join(self.artifactdir, ref))
# list_artifacts():
#
@@ -177,9 +193,7 @@ class ArtifactCache(BaseCache):
# ([str]) - A list of artifact names as generated in LRU order
#
def list_artifacts(self, *, glob=None):
- return list(filter(
- lambda x: not x.startswith('@'),
- self.cas.list_refs(glob=glob)))
+ return [ref for _, ref in sorted(list(self._list_refs_mtimes(self.artifactdir, glob_expr=glob)))]
# remove():
#
@@ -196,7 +210,10 @@ class ArtifactCache(BaseCache):
# (int): The amount of space recovered in the cache, in bytes
#
def remove(self, ref, *, defer_prune=False):
- return self.cas.remove(ref, defer_prune=defer_prune)
+ try:
+ return self.cas.remove(ref, basedir=self.artifactdir, defer_prune=defer_prune)
+ except CASCacheError as e:
+ raise ArtifactError("{}".format(e)) from e
# prune():
#
@@ -205,63 +222,31 @@ class ArtifactCache(BaseCache):
def prune(self):
return self.cas.prune()
- # get_artifact_directory():
- #
- # Get virtual directory for cached artifact of the specified Element.
- #
- # Assumes artifact has previously been fetched or committed.
- #
- # Args:
- # element (Element): The Element to extract
- # key (str): The cache key to use
- #
- # Raises:
- # ArtifactError: In cases there was an OSError, or if the artifact
- # did not exist.
- #
- # Returns: virtual directory object
- #
- def get_artifact_directory(self, element, key):
- ref = element.get_artifact_name(key)
- try:
- digest = self.cas.resolve_ref(ref, update_mtime=True)
- return CasBasedDirectory(self.cas, digest=digest)
- except (CASCacheError, VirtualDirectoryError) as e:
- raise ArtifactError('Directory not in local cache: {}'.format(e)) from e
-
- # commit():
- #
- # Commit built artifact to cache.
- #
- # Args:
- # element (Element): The Element commit an artifact for
- # content (Directory): The element's content directory
- # keys (list): The cache keys to use
- #
- def commit(self, element, content, keys):
- refs = [element.get_artifact_name(key) for key in keys]
-
- tree = content._get_digest()
-
- for ref in refs:
- self.cas.set_ref(ref, tree)
-
# diff():
#
# Return a list of files that have been added or modified between
- # the artifacts described by key_a and key_b.
+ # the artifacts described by key_a and key_b. This expects the
+ # provided keys to be strong cache keys
#
# Args:
# element (Element): The element whose artifacts to compare
- # key_a (str): The first artifact key
- # key_b (str): The second artifact key
- # subdir (str): A subdirectory to limit the comparison to
+ # key_a (str): The first artifact strong key
+ # key_b (str): The second artifact strong key
#
- def diff(self, element, key_a, key_b, *, subdir=None):
- ref_a = element.get_artifact_name(key_a)
- ref_b = element.get_artifact_name(key_b)
+ def diff(self, element, key_a, key_b):
+ context = self.context
+ artifact_a = Artifact(element, context, strong_key=key_a)
+ artifact_b = Artifact(element, context, strong_key=key_b)
+ digest_a = artifact_a._get_proto().files
+ digest_b = artifact_b._get_proto().files
- return self.cas.diff(ref_a, ref_b, subdir=subdir)
+ added = []
+ removed = []
+ modified = []
+
+ self.cas.diff_trees(digest_a, digest_b, added=added, removed=removed, modified=modified)
+
+ return modified, removed, added
# push():
#
@@ -269,7 +254,7 @@ class ArtifactCache(BaseCache):
#
# Args:
# element (Element): The Element whose artifact is to be pushed
- # keys (list): The cache keys to use
+ # artifact (Artifact): The artifact being pushed
#
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
@@ -277,9 +262,7 @@ class ArtifactCache(BaseCache):
# Raises:
# (ArtifactError): if there was an error
#
- def push(self, element, keys):
- refs = [element.get_artifact_name(key) for key in list(keys)]
-
+ def push(self, element, artifact):
project = element._get_project()
push_remotes = [r for r in self._remotes[project] if r.spec.push]
@@ -291,7 +274,7 @@ class ArtifactCache(BaseCache):
display_key = element._get_brief_display_key()
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
- if self.cas.push(refs, remote):
+ if self._push_artifact(element, artifact, remote):
element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
pushed = True
else:
@@ -308,24 +291,21 @@ class ArtifactCache(BaseCache):
# Args:
# element (Element): The Element whose artifact is to be fetched
# key (str): The cache key to use
- # progress (callable): The progress callback, if any
- # subdir (str): The optional specific subdir to pull
- # excluded_subdirs (list): The optional list of subdirs to not pull
+ # pull_buildtrees (bool): Whether to pull buildtrees or not
#
# Returns:
# (bool): True if pull was successful, False if artifact was not available
#
- def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
- ref = element.get_artifact_name(key)
+ def pull(self, element, key, *, pull_buildtrees=False):
display_key = key[:self.context.log_key_length]
-
project = element._get_project()
for remote in self._remotes[project]:
+ remote.init()
try:
element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
- if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
+ if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees):
element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
# no need to pull from additional remotes
return True
@@ -399,7 +379,9 @@ class ArtifactCache(BaseCache):
oldref = element.get_artifact_name(oldkey)
newref = element.get_artifact_name(newkey)
- self.cas.link_ref(oldref, newref)
+ if not os.path.exists(os.path.join(self.artifactdir, newref)):
+ os.link(os.path.join(self.artifactdir, oldref),
+ os.path.join(self.artifactdir, newref))
# get_artifact_logs():
#
@@ -463,3 +445,173 @@ class ArtifactCache(BaseCache):
remote_missing_blobs_set.update(remote_missing_blobs)
return list(remote_missing_blobs_set)
+
+ ################################################
+ # Local Private Methods #
+ ################################################
+
+ # _reachable_directories()
+ #
+ # Returns:
+ # (iter): Iterator over directories digests available from artifacts.
+ #
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
+
+ if str(artifact.files):
+ yield artifact.files
+
+ if str(artifact.buildtree):
+ yield artifact.buildtree
+
+ # _reachable_digests()
+ #
+ # Returns:
+ # (iter): Iterator over single file digests in artifacts
+ #
+ def _reachable_digests(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
+
+ if str(artifact.public_data):
+ yield artifact.public_data
+
+ for log_file in artifact.logs:
+ yield log_file.digest
+
+ # _push_artifact()
+ #
+ # Pushes relevant directories and then artifact proto to remote.
+ #
+ # Args:
+ # element (Element): The element
+ # artifact (Artifact): The related artifact being pushed
+ # remote (CASRemote): Remote to push to
+ #
+ # Returns:
+ # (bool): whether the push was successful
+ #
+ def _push_artifact(self, element, artifact, remote):
+
+ artifact_proto = artifact._get_proto()
+
+ keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
+
+ # Check whether the artifact is on the server
+ present = False
+ for key in keys:
+ get_artifact = artifact_pb2.GetArtifactRequest()
+ get_artifact.cache_key = element.get_artifact_name(key)
+ try:
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
+ artifact_service.GetArtifact(get_artifact)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Error checking artifact cache: {}"
+ .format(e.details()))
+ else:
+ present = True
+ if present:
+ return False
+
+ try:
+ self.cas._send_directory(remote, artifact_proto.files)
+
+ if str(artifact_proto.buildtree):
+ try:
+ self.cas._send_directory(remote, artifact_proto.buildtree)
+ except FileNotFoundError:
+ pass
+
+ digests = []
+ if str(artifact_proto.public_data):
+ digests.append(artifact_proto.public_data)
+
+ for log_file in artifact_proto.logs:
+ digests.append(log_file.digest)
+
+ self.cas.send_blobs(remote, digests)
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise ArtifactError("Failed to push artifact blobs: {}".format(e.details()))
+ return False
+
+ # finally need to send the artifact proto
+ for key in keys:
+ update_artifact = artifact_pb2.UpdateArtifactRequest()
+ update_artifact.cache_key = element.get_artifact_name(key)
+ update_artifact.artifact.CopyFrom(artifact_proto)
+
+ try:
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
+ artifact_service.UpdateArtifact(update_artifact)
+ except grpc.RpcError as e:
+ raise ArtifactError("Failed to push artifact: {}".format(e.details()))
+
+ return True
+
+ # _pull_artifact()
+ #
+ # Args:
+ # element (Element): element to pull
+ # key (str): specific key of element to pull
+ # remote (CASRemote): remote to pull from
+ # pull_buildtree (bool): whether to pull buildtrees or not
+ #
+ # Returns:
+ # (bool): whether the pull was successful
+ #
+ def _pull_artifact(self, element, key, remote, pull_buildtrees=False):
+
+ def __pull_digest(digest):
+ self.cas._fetch_directory(remote, digest)
+ required_blobs = self.cas.required_blobs_for_directory(digest)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ if missing_blobs:
+ self.cas.fetch_blobs(remote, missing_blobs)
+
+ request = artifact_pb2.GetArtifactRequest()
+ request.cache_key = element.get_artifact_name(key=key)
+ try:
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
+ artifact = artifact_service.GetArtifact(request)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
+ return False
+
+ try:
+ if str(artifact.files):
+ __pull_digest(artifact.files)
+
+ if pull_buildtrees and str(artifact.buildtree):
+ __pull_digest(artifact.buildtree)
+
+ digests = []
+ if str(artifact.public_data):
+ digests.append(artifact.public_data)
+
+ for log_digest in artifact.logs:
+ digests.append(log_digest.digest)
+
+ self.cas.fetch_blobs(remote, digests)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
+ return False
+
+ # Write the artifact proto to cache
+ artifact_path = os.path.join(self.artifactdir, request.cache_key)
+ os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
+ with open(artifact_path, 'w+b') as f:
+ f.write(artifact.SerializeToString())
+
+ return True
diff --git a/buildstream/_basecache.py b/buildstream/_basecache.py
index af3fe9bb7..68654b2a0 100644
--- a/buildstream/_basecache.py
+++ b/buildstream/_basecache.py
@@ -17,6 +17,8 @@
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import multiprocessing
+import os
+from fnmatch import fnmatch
from . import utils
from . import _yaml
@@ -274,3 +276,32 @@ class BaseCache():
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
+
+ # _list_refs_mtimes()
+ #
+ # List refs in a directory, given a base path. Also returns the
+ # associated mtimes
+ #
+ # Args:
+ # base_path (str): Base path to traverse over
+ # glob_expr (str|None): Optional glob expression to match against files
+ #
+ # Returns:
+ # (iter (mtime, filename)]): iterator of tuples of mtime and refs
+ #
+ def _list_refs_mtimes(self, base_path, *, glob_expr=None):
+ path = base_path
+ if glob_expr is not None:
+ globdir = os.path.dirname(glob_expr)
+ if not any(c in "*?[" for c in globdir):
+ # path prefix contains no globbing characters so
+ # append the glob to optimise the os.walk()
+ path = os.path.join(base_path, globdir)
+
+ for root, _, files in os.walk(path):
+ for filename in files:
+ ref_path = os.path.join(root, filename)
+ relative_path = os.path.relpath(ref_path, base_path) # Relative to refs head
+ if not glob_expr or fnmatch(relative_path, glob_expr):
+ # Obtain the mtime (the time a file was last modified)
+ yield (os.path.getmtime(ref_path), relative_path)
diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py
index 5f67dc0c1..ad8013d18 100644
--- a/buildstream/_cas/cascache.py
+++ b/buildstream/_cas/cascache.py
@@ -24,7 +24,6 @@ import stat
import errno
import uuid
import contextlib
-from fnmatch import fnmatch
import grpc
@@ -89,6 +88,9 @@ class CASCache():
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
+ self.__reachable_directory_callbacks = []
+ self.__reachable_digest_callbacks = []
+
# preflight():
#
# Preflight check.
@@ -114,28 +116,6 @@ class CASCache():
# This assumes that the repository doesn't have any dangling pointers
return os.path.exists(refpath)
- # contains_subdir_artifact():
- #
- # Check whether the specified artifact element tree has a digest for a subdir
- # which is populated in the cache, i.e non dangling.
- #
- # Args:
- # ref (str): The ref to check
- # subdir (str): The subdir to check
- # with_files (bool): Whether to check files as well
- #
- # Returns: True if the subdir exists & is populated in the cache, False otherwise
- #
- def contains_subdir_artifact(self, ref, subdir, *, with_files=True):
- tree = self.resolve_ref(ref)
-
- try:
- subdirdigest = self._get_subdir(tree, subdir)
-
- return self.contains_directory(subdirdigest, with_files=with_files)
- except (CASCacheError, FileNotFoundError):
- return False
-
# contains_directory():
#
# Check whether the specified directory and subdirecotires are in the cache,
@@ -230,19 +210,15 @@ class CASCache():
# ref_b (str): The second ref
# subdir (str): A subdirectory to limit the comparison to
#
- def diff(self, ref_a, ref_b, *, subdir=None):
+ def diff(self, ref_a, ref_b):
tree_a = self.resolve_ref(ref_a)
tree_b = self.resolve_ref(ref_b)
- if subdir:
- tree_a = self._get_subdir(tree_a, subdir)
- tree_b = self._get_subdir(tree_b, subdir)
-
added = []
removed = []
modified = []
- self._diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified)
+ self.diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified)
return modified, removed, added
@@ -253,14 +229,11 @@ class CASCache():
# Args:
# ref (str): The ref to pull
# remote (CASRemote): The remote repository to pull from
- # progress (callable): The progress callback, if any
- # subdir (str): The optional specific subdir to pull
- # excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (bool): True if pull was successful, False if ref was not available
#
- def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
+ def pull(self, ref, remote):
try:
remote.init()
@@ -274,7 +247,7 @@ class CASCache():
self._fetch_directory(remote, tree)
# Fetch files, excluded_subdirs determined in pullqueue
- required_blobs = self.required_blobs_for_directory(tree, excluded_subdirs=excluded_subdirs)
+ required_blobs = self.required_blobs_for_directory(tree)
missing_blobs = self.local_missing_blobs(required_blobs)
if missing_blobs:
self.fetch_blobs(remote, missing_blobs)
@@ -502,44 +475,6 @@ class CASCache():
except FileNotFoundError as e:
raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e
- # list_refs():
- #
- # List refs in Least Recently Modified (LRM) order.
- #
- # Args:
- # glob (str) - An optional glob expression to be used to list refs satisfying the glob
- #
- # Returns:
- # (list) - A list of refs in LRM order
- #
- def list_refs(self, *, glob=None):
- # string of: /path/to/repo/refs/heads
- ref_heads = os.path.join(self.casdir, 'refs', 'heads')
- path = ref_heads
-
- if glob is not None:
- globdir = os.path.dirname(glob)
- if not any(c in "*?[" for c in globdir):
- # path prefix contains no globbing characters so
- # append the glob to optimise the os.walk()
- path = os.path.join(ref_heads, globdir)
-
- refs = []
- mtimes = []
-
- for root, _, files in os.walk(path):
- for filename in files:
- ref_path = os.path.join(root, filename)
- relative_path = os.path.relpath(ref_path, ref_heads) # Relative to refs head
- if not glob or fnmatch(relative_path, glob):
- refs.append(relative_path)
- # Obtain the mtime (the time a file was last modified)
- mtimes.append(os.path.getmtime(ref_path))
-
- # NOTE: Sorted will sort from earliest to latest, thus the
- # first ref of this list will be the file modified earliest.
- return [ref for _, ref in sorted(zip(mtimes, refs))]
-
# list_objects():
#
# List cached objects in Least Recently Modified (LRM) order.
@@ -581,6 +516,8 @@ class CASCache():
#
# Args:
# ref (str): A symbolic ref
+ # basedir (str): Path of base directory the ref is in, defaults to
+ # CAS refs heads
# defer_prune (bool): Whether to defer pruning to the caller. NOTE:
# The space won't be freed until you manually
# call prune.
@@ -589,10 +526,12 @@ class CASCache():
# (int|None) The amount of space pruned from the repository in
# Bytes, or None if defer_prune is True
#
- def remove(self, ref, *, defer_prune=False):
+ def remove(self, ref, *, basedir=None, defer_prune=False):
+ if basedir is None:
+ basedir = os.path.join(self.casdir, 'refs', 'heads')
# Remove cache ref
- self._remove_ref(ref)
+ self._remove_ref(ref, basedir)
if not defer_prune:
pruned = self.prune()
@@ -600,6 +539,14 @@ class CASCache():
return None
+ # adds callback of iterator over reachable directory digests
+ def add_reachable_directories_callback(self, callback):
+ self.__reachable_directory_callbacks.append(callback)
+
+ # adds callbacks of iterator over reachable file digests
+ def add_reachable_digests_callback(self, callback):
+ self.__reachable_digest_callbacks.append(callback)
+
# prune():
#
# Prune unreachable objects from the repo.
@@ -619,6 +566,16 @@ class CASCache():
tree = self.resolve_ref(ref)
self._reachable_refs_dir(reachable, tree)
+ # check callback directory digests that are reachable
+ for digest_callback in self.__reachable_directory_callbacks:
+ for digest in digest_callback():
+ self._reachable_refs_dir(reachable, digest)
+
+ # check callback file digests that are reachable
+ for digest_callback in self.__reachable_digest_callbacks:
+ for digest in digest_callback():
+ reachable.add(digest.hash)
+
# Prune unreachable objects
for root, _, files in os.walk(os.path.join(self.casdir, 'objects')):
for filename in files:
@@ -717,6 +674,59 @@ class CASCache():
if dirnode.name not in excluded_subdirs:
yield from self.required_blobs_for_directory(dirnode.digest)
+ def diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
+ dir_a = remote_execution_pb2.Directory()
+ dir_b = remote_execution_pb2.Directory()
+
+ if tree_a:
+ with open(self.objpath(tree_a), 'rb') as f:
+ dir_a.ParseFromString(f.read())
+ if tree_b:
+ with open(self.objpath(tree_b), 'rb') as f:
+ dir_b.ParseFromString(f.read())
+
+ a = 0
+ b = 0
+ while a < len(dir_a.files) or b < len(dir_b.files):
+ if b < len(dir_b.files) and (a >= len(dir_a.files) or
+ dir_a.files[a].name > dir_b.files[b].name):
+ added.append(os.path.join(path, dir_b.files[b].name))
+ b += 1
+ elif a < len(dir_a.files) and (b >= len(dir_b.files) or
+ dir_b.files[b].name > dir_a.files[a].name):
+ removed.append(os.path.join(path, dir_a.files[a].name))
+ a += 1
+ else:
+ # File exists in both directories
+ if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash:
+ modified.append(os.path.join(path, dir_a.files[a].name))
+ a += 1
+ b += 1
+
+ a = 0
+ b = 0
+ while a < len(dir_a.directories) or b < len(dir_b.directories):
+ if b < len(dir_b.directories) and (a >= len(dir_a.directories) or
+ dir_a.directories[a].name > dir_b.directories[b].name):
+ self.diff_trees(None, dir_b.directories[b].digest,
+ added=added, removed=removed, modified=modified,
+ path=os.path.join(path, dir_b.directories[b].name))
+ b += 1
+ elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or
+ dir_b.directories[b].name > dir_a.directories[a].name):
+ self.diff_trees(dir_a.directories[a].digest, None,
+ added=added, removed=removed, modified=modified,
+ path=os.path.join(path, dir_a.directories[a].name))
+ a += 1
+ else:
+ # Subdirectory exists in both directories
+ if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash:
+ self.diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest,
+ added=added, removed=removed, modified=modified,
+ path=os.path.join(path, dir_a.directories[a].name))
+ a += 1
+ b += 1
+
################################################
# Local Private Methods #
################################################
@@ -733,22 +743,24 @@ class CASCache():
#
# Args:
# ref (str): The ref to remove
+ # basedir (str): Path of base directory the ref is in
#
# Raises:
# (CASCacheError): If the ref didnt exist, or a system error
# occurred while removing it
#
- def _remove_ref(self, ref):
+ def _remove_ref(self, ref, basedir):
# Remove the ref itself
- refpath = self._refpath(ref)
+ refpath = os.path.join(basedir, ref)
+
try:
os.unlink(refpath)
except FileNotFoundError as e:
raise CASCacheError("Could not find ref '{}'".format(ref)) from e
# Now remove any leading directories
- basedir = os.path.join(self.casdir, 'refs', 'heads')
+
components = list(os.path.split(ref))
while components:
components.pop()
@@ -831,59 +843,6 @@ class CASCache():
raise CASCacheError("Subdirectory {} not found".format(name))
- def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""):
- dir_a = remote_execution_pb2.Directory()
- dir_b = remote_execution_pb2.Directory()
-
- if tree_a:
- with open(self.objpath(tree_a), 'rb') as f:
- dir_a.ParseFromString(f.read())
- if tree_b:
- with open(self.objpath(tree_b), 'rb') as f:
- dir_b.ParseFromString(f.read())
-
- a = 0
- b = 0
- while a < len(dir_a.files) or b < len(dir_b.files):
- if b < len(dir_b.files) and (a >= len(dir_a.files) or
- dir_a.files[a].name > dir_b.files[b].name):
- added.append(os.path.join(path, dir_b.files[b].name))
- b += 1
- elif a < len(dir_a.files) and (b >= len(dir_b.files) or
- dir_b.files[b].name > dir_a.files[a].name):
- removed.append(os.path.join(path, dir_a.files[a].name))
- a += 1
- else:
- # File exists in both directories
- if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash:
- modified.append(os.path.join(path, dir_a.files[a].name))
- a += 1
- b += 1
-
- a = 0
- b = 0
- while a < len(dir_a.directories) or b < len(dir_b.directories):
- if b < len(dir_b.directories) and (a >= len(dir_a.directories) or
- dir_a.directories[a].name > dir_b.directories[b].name):
- self._diff_trees(None, dir_b.directories[b].digest,
- added=added, removed=removed, modified=modified,
- path=os.path.join(path, dir_b.directories[b].name))
- b += 1
- elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or
- dir_b.directories[b].name > dir_a.directories[a].name):
- self._diff_trees(dir_a.directories[a].digest, None,
- added=added, removed=removed, modified=modified,
- path=os.path.join(path, dir_a.directories[a].name))
- a += 1
- else:
- # Subdirectory exists in both directories
- if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash:
- self._diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest,
- added=added, removed=removed, modified=modified,
- path=os.path.join(path, dir_a.directories[a].name))
- a += 1
- b += 1
-
def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False):
if tree.hash in reachable:
return
@@ -1148,8 +1107,8 @@ class CASQuota:
self._message = context.message
- self._ref_callbacks = [] # Call backs to get required refs
- self._remove_callbacks = [] # Call backs to remove refs
+ self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method
+ self._list_refs_callbacks = [] # Callbacks to all refs
self._calculate_cache_quota()
@@ -1227,6 +1186,21 @@ class CASQuota:
return False
+ # add_remove_callbacks()
+ #
+ # This adds tuples of iterators over unrequired objects (currently
+ # artifacts and source refs), and a callback to remove them.
+ #
+ # Args:
+ # callback (iter(unrequired), remove): tuple of iterator and remove
+ # method associated.
+ #
+ def add_remove_callbacks(self, list_unrequired, remove_method):
+ self._remove_callbacks.append((list_unrequired, remove_method))
+
+ def add_list_refs_callback(self, list_callback):
+ self._list_refs_callbacks.append(list_callback)
+
################################################
# Local Private Methods #
################################################
@@ -1383,28 +1357,25 @@ class CASQuota:
removed_ref_count = 0
space_saved = 0
- # get required refs
- refs = self.cas.list_refs()
- required_refs = set(
- required
- for callback in self._ref_callbacks
- for required in callback()
- )
+ total_refs = 0
+ for refs in self._list_refs_callbacks:
+ total_refs += len(list(refs()))
# Start off with an announcement with as much info as possible
volume_size, volume_avail = self._get_cache_volume_size()
self._message(Message(
None, MessageType.STATUS, "Starting cache cleanup",
- detail=("Elements required by the current build plan: {}\n" +
+ detail=("Elements required by the current build plan:\n" + "{}\n" +
"User specified quota: {} ({})\n" +
"Cache usage: {}\n" +
"Cache volume: {} total, {} available")
- .format(len(required_refs),
- context.config_cache_quota,
- utils._pretty_size(self._cache_quota, dec_places=2),
- utils._pretty_size(self.get_cache_size(), dec_places=2),
- utils._pretty_size(volume_size, dec_places=2),
- utils._pretty_size(volume_avail, dec_places=2))))
+ .format(
+ total_refs,
+ context.config_cache_quota,
+ utils._pretty_size(self._cache_quota, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ utils._pretty_size(volume_size, dec_places=2),
+ utils._pretty_size(volume_avail, dec_places=2))))
# Do a real computation of the cache size once, just in case
self.compute_cache_size()
@@ -1412,67 +1383,63 @@ class CASQuota:
self._message(Message(None, MessageType.STATUS,
"Cache usage recomputed: {}".format(usage)))
- while self.get_cache_size() >= self._cache_lower_threshold:
- try:
- to_remove = refs.pop(0)
- except IndexError:
- # If too many artifacts are required, and we therefore
- # can't remove them, we have to abort the build.
- #
- # FIXME: Asking the user what to do may be neater
- #
- default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
- 'buildstream.conf')
- detail = ("Aborted after removing {} refs and saving {} disk space.\n"
- "The remaining {} in the cache is required by the {} references in your build plan\n\n"
- "There is not enough space to complete the build.\n"
- "Please increase the cache-quota in {} and/or make more disk space."
- .format(removed_ref_count,
- utils._pretty_size(space_saved, dec_places=2),
- utils._pretty_size(self.get_cache_size(), dec_places=2),
- len(required_refs),
- (context.config_origin or default_conf)))
-
- if self.full():
- raise CASCacheError("Cache too full. Aborting.",
- detail=detail,
- reason="cache-too-full")
- else:
- break
-
- key = to_remove.rpartition('/')[2]
- if key not in required_refs:
-
- # Remove the actual artifact, if it's not required.
- size = 0
- removed_ref = False
- for (pred, remove) in self._remove_callbacks:
- if pred(to_remove):
- size = remove(to_remove)
- removed_ref = True
- break
-
- if not removed_ref:
- continue
+ # Collect digests and their remove method
+ all_unrequired_refs = []
+ for (unrequired_refs, remove) in self._remove_callbacks:
+ for (mtime, ref) in unrequired_refs():
+ all_unrequired_refs.append((mtime, ref, remove))
- removed_ref_count += 1
- space_saved += size
+ # Pair refs and their remove method sorted in time order
+ all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)]
- self._message(Message(
- None, MessageType.STATUS,
- "Freed {: <7} {}".format(
- utils._pretty_size(size, dec_places=2),
- to_remove)))
+ # Go through unrequired refs and remove them, oldest first
+ made_space = False
+ for (ref, remove) in all_unrequired_refs:
+ size = remove(ref)
+ removed_ref_count += 1
+ space_saved += size
- self.set_cache_size(self._cache_size - size)
+ self._message(Message(
+ None, MessageType.STATUS,
+ "Freed {: <7} {}".format(
+ utils._pretty_size(size, dec_places=2),
+ ref)))
+
+ self.set_cache_size(self._cache_size - size)
+
+ # User callback
+ #
+ # Currently this process is fairly slow, but we should
+ # think about throttling this progress() callback if this
+ # becomes too intense.
+ if progress:
+ progress()
+
+ if self.get_cache_size() < self._cache_lower_threshold:
+ made_space = True
+ break
- # User callback
- #
- # Currently this process is fairly slow, but we should
- # think about throttling this progress() callback if this
- # becomes too intense.
- if progress:
- progress()
+ if not made_space and self.full():
+ # If too many artifacts are required, and we therefore
+ # can't remove them, we have to abort the build.
+ #
+ # FIXME: Asking the user what to do may be neater
+ #
+ default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
+ 'buildstream.conf')
+ detail = ("Aborted after removing {} refs and saving {} disk space.\n"
+ "The remaining {} in the cache is required by the {} references in your build plan\n\n"
+ "There is not enough space to complete the build.\n"
+ "Please increase the cache-quota in {} and/or make more disk space."
+ .format(removed_ref_count,
+ utils._pretty_size(space_saved, dec_places=2),
+ utils._pretty_size(self.get_cache_size(), dec_places=2),
+ total_refs,
+ (context.config_origin or default_conf)))
+
+ raise CASCacheError("Cache too full. Aborting.",
+ detail=detail,
+ reason="cache-too-full")
# Informational message about the side effects of the cleanup
self._message(Message(
@@ -1485,22 +1452,6 @@ class CASQuota:
return self.get_cache_size()
- # add_ref_callbacks()
- #
- # Args:
- # callback (Iterator): function that gives list of required refs
- def add_ref_callbacks(self, callback):
- self._ref_callbacks.append(callback)
-
- # add_remove_callbacks()
- #
- # Args:
- # callback (predicate, callback): The predicate says whether this is the
- # correct type to remove given a ref and the callback does actual
- # removing.
- def add_remove_callbacks(self, callback):
- self._remove_callbacks.append(callback)
-
def _grouper(iterable, n):
while True:
diff --git a/buildstream/_cas/casserver.py b/buildstream/_cas/casserver.py
index f88db717a..c08a4d577 100644
--- a/buildstream/_cas/casserver.py
+++ b/buildstream/_cas/casserver.py
@@ -428,11 +428,25 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
with open(artifact_path, 'rb') as f:
artifact.ParseFromString(f.read())
- files_digest = artifact.files
-
# Now update mtimes of files present.
try:
- self.cas.update_tree_mtime(files_digest)
+
+ if str(artifact.files):
+ self.cas.update_tree_mtime(artifact.files)
+
+ if str(artifact.buildtree):
+ # buildtrees might not be there
+ try:
+ self.cas.update_tree_mtime(artifact.buildtree)
+ except FileNotFoundError:
+ pass
+
+ if str(artifact.public_data):
+ os.utime(self.cas.objpath(artifact.public_data))
+
+ for log_file in artifact.logs:
+ os.utime(self.cas.objpath(log_file.digest))
+
except FileNotFoundError:
os.unlink(artifact_path)
context.abort(grpc.StatusCode.NOT_FOUND,
@@ -451,9 +465,6 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
# Unset protocol buffers don't evaluated to False but do return empty
# strings, hence str()
- if str(artifact.buildtree):
- self._check_directory("buildtree", artifact.buildtree, context)
-
if str(artifact.public_data):
self._check_file("public data", artifact.public_data, context)
diff --git a/buildstream/_context.py b/buildstream/_context.py
index fffeea17e..151ea636a 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -75,6 +75,9 @@ class Context():
# The directory for CAS
self.casdir = None
+ # The directory for artifact protos
+ self.artifactdir = None
+
# The directory for temporary files
self.tmpdir = None
@@ -230,6 +233,7 @@ class Context():
self.tmpdir = os.path.join(self.cachedir, 'tmp')
self.casdir = os.path.join(self.cachedir, 'cas')
self.builddir = os.path.join(self.cachedir, 'build')
+ self.artifactdir = os.path.join(self.cachedir, 'artifacts', 'refs')
# Move old artifact cas to cas if it exists and create symlink
old_casdir = os.path.join(self.cachedir, 'artifacts', 'cas')
diff --git a/buildstream/_sourcecache.py b/buildstream/_sourcecache.py
index d00015128..1d3342a75 100644
--- a/buildstream/_sourcecache.py
+++ b/buildstream/_sourcecache.py
@@ -17,6 +17,8 @@
# Authors:
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
+import os
+
from ._cas import CASRemoteSpec
from .storage._casbaseddirectory import CasBasedDirectory
from ._basecache import BaseCache
@@ -53,8 +55,8 @@ class SourceCache(BaseCache):
self._required_sources = set()
- self.casquota.add_ref_callbacks(self.required_sources)
- self.casquota.add_remove_callbacks((lambda x: x.startswith('@sources/'), self.cas.remove))
+ self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove)
+ self.casquota.add_list_refs_callback(self.list_sources)
# mark_required_sources()
#
@@ -81,14 +83,43 @@ class SourceCache(BaseCache):
# required_sources()
#
- # Yields the keys of all sources marked as required
+ # Yields the keys of all sources marked as required by the current build
+ # plan
#
# Returns:
- # iterable (str): iterable over the source keys
+ # iterable (str): iterable over the required source refs
#
def required_sources(self):
for source in self._required_sources:
- yield source._key
+ yield source._get_source_name()
+
+ # unrequired_sources()
+ #
+ # Yields the refs of all sources not required by the current build plan
+ #
+ # Returns:
+ # iter (str): iterable over unrequired source keys
+ #
+ def unrequired_sources(self):
+ required_source_names = set(map(
+ lambda x: x._get_source_name(), self._required_sources))
+ for (mtime, source) in self._list_refs_mtimes(
+ os.path.join(self.cas.casdir, 'refs', 'heads'),
+ glob_expr="@sources/*"):
+ if source not in required_source_names:
+ yield (mtime, source)
+
+ # list_sources()
+ #
+ # Get list of all sources in the `cas/refs/heads/@sources/` folder
+ #
+ # Returns:
+ # ([str]): iterable over all source refs
+ #
+ def list_sources(self):
+ return [ref for _, ref in self._list_refs_mtimes(
+ os.path.join(self.cas.casdir, 'refs', 'heads'),
+ glob_expr="@sources/*")]
# contains()
#
@@ -159,7 +190,7 @@ class SourceCache(BaseCache):
#
# Returns:
# (bool): True if pull successful, False if not
- def pull(self, source, *, progress=None):
+ def pull(self, source):
ref = source._get_source_name()
project = source._get_project()
@@ -170,7 +201,7 @@ class SourceCache(BaseCache):
try:
source.status("Pulling source {} <- {}".format(display_key, remote.spec.url))
- if self.cas.pull(ref, remote, progress=progress):
+ if self.cas.pull(ref, remote):
source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
# no need to pull from additional remotes
return True
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index d4f26e443..2343c553c 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -32,7 +32,7 @@ from contextlib import contextmanager, suppress
from fnmatch import fnmatch
from ._artifactelement import verify_artifact_ref
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, CASCacheError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
@@ -587,7 +587,7 @@ class Stream():
for ref in remove_refs:
try:
self._artifacts.remove(ref, defer_prune=True)
- except CASCacheError as e:
+ except ArtifactError as e:
self._message(MessageType.WARN, str(e))
continue
diff --git a/buildstream/element.py b/buildstream/element.py
index 7f68af262..ec69d85e9 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -198,7 +198,7 @@ class Element(Plugin):
if not self.__is_junction:
project.ensure_fully_loaded()
- self.normal_name = os.path.splitext(self.name.replace(os.sep, '-'))[0]
+ self.normal_name = _get_normal_name(self.name)
"""A normalized element name
This is the original element without path separators or
@@ -620,15 +620,7 @@ class Element(Plugin):
assert key is not None
- valid_chars = string.digits + string.ascii_letters + '-._'
- element_name = ''.join([
- x if x in valid_chars else '_'
- for x in self.normal_name
- ])
-
- # Note that project names are not allowed to contain slashes. Element names containing
- # a '/' will have this replaced with a '-' upon Element object instantiation.
- return '{0}/{1}/{2}'.format(project.name, element_name, key)
+ return _compose_artifact_name(project.name, self.normal_name, key)
def stage_artifact(self, sandbox, *, path=None, include=None, exclude=None, orphans=True, update_mtimes=None):
"""Stage this element's output artifact in the sandbox
@@ -749,6 +741,7 @@ class Element(Plugin):
#
if self.__artifacts.contains(self, workspace.last_successful):
last_successful = Artifact(self, context, strong_key=workspace.last_successful)
+ # Get a dict of dependency strong keys
old_dep_keys = last_successful.get_metadata_dependencies()
else:
# Last successful build is no longer in the artifact cache,
@@ -773,12 +766,12 @@ class Element(Plugin):
if dep.name in old_dep_keys:
key_new = dep._get_cache_key()
- key_old = _yaml.node_get(old_dep_keys, str, dep.name)
+ key_old = old_dep_keys[dep.name]
# We only need to worry about modified and added
# files, since removed files will be picked up by
# build systems anyway.
- to_update, _, added = self.__artifacts.diff(dep, key_old, key_new, subdir='files')
+ to_update, _, added = self.__artifacts.diff(dep, key_old, key_new)
workspace.add_running_files(dep.name, to_update + added)
to_update.extend(workspace.running_files[dep.name])
@@ -1888,11 +1881,12 @@ class Element(Plugin):
# Check whether the pull has been invoked with a specific subdir requested
# in user context, as to complete a partial artifact
- subdir, _ = self.__pull_directories()
+ pull_buildtrees = self._get_context().pull_buildtrees
- if self.__strong_cached and subdir:
+ if self.__strong_cached and pull_buildtrees:
# If we've specified a subdir, check if the subdir is cached locally
- if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir):
+ # or if it's possible to get
+ if self._cached_buildtree() or not self._buildtree_exists():
return False
elif self.__strong_cached:
return False
@@ -1925,18 +1919,15 @@ class Element(Plugin):
def _pull(self):
context = self._get_context()
- def progress(percent, message):
- self.status(message)
-
# Get optional specific subdir to pull and optional list to not pull
# based off of user context
- subdir, excluded_subdirs = self.__pull_directories()
+ pull_buildtrees = context.pull_buildtrees
# Attempt to pull artifact without knowing whether it's available
- pulled = self.__pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
+ pulled = self.__pull_strong(pull_buildtrees=pull_buildtrees)
if not pulled and not self._cached() and not context.get_strict():
- pulled = self.__pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
+ pulled = self.__pull_weak(pull_buildtrees=pull_buildtrees)
if not pulled:
return False
@@ -1998,8 +1989,8 @@ class Element(Plugin):
self.warn("Not pushing tainted artifact.")
return False
- # Push all keys used for local commit
- pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit())
+ # Push all keys used for local commit via the Artifact member
+ pushed = self.__artifacts.push(self, self.__artifact)
if not pushed:
return False
@@ -2861,17 +2852,6 @@ class Element(Plugin):
self.__build_result = self.__artifact.load_build_result()
- def __get_cache_keys_for_commit(self):
- keys = []
-
- # tag with strong cache key based on dependency versions used for the build
- keys.append(self._get_cache_key(strength=_KeyStrength.STRONG))
-
- # also store under weak cache key
- keys.append(self._get_cache_key(strength=_KeyStrength.WEAK))
-
- return utils._deduplicate(keys)
-
# __pull_strong():
#
# Attempt pulling given element from configured artifact caches with
@@ -2885,11 +2865,10 @@ class Element(Plugin):
# Returns:
# (bool): Whether or not the pull was successful
#
- def __pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None):
+ def __pull_strong(self, *, pull_buildtrees):
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
key = self.__strict_cache_key
- if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir,
- excluded_subdirs=excluded_subdirs):
+ if not self.__artifacts.pull(self, key, pull_buildtrees=pull_buildtrees):
return False
# update weak ref by pointing it to this newly fetched artifact
@@ -2903,17 +2882,16 @@ class Element(Plugin):
# the weak cache key
#
# Args:
- # progress (callable): The progress callback, if any
# subdir (str): The optional specific subdir to pull
# excluded_subdirs (list): The optional list of subdirs to not pull
#
# Returns:
# (bool): Whether or not the pull was successful
#
- def __pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None):
+ def __pull_weak(self, *, pull_buildtrees):
weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
- if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir,
- excluded_subdirs=excluded_subdirs):
+ if not self.__artifacts.pull(self, weak_key,
+ pull_buildtrees=pull_buildtrees):
return False
# extract strong cache key from this newly fetched artifact
@@ -2925,37 +2903,6 @@ class Element(Plugin):
return True
- # __pull_directories():
- #
- # Which directories to include or exclude given the current
- # context
- #
- # Returns:
- # subdir (str): The optional specific subdir to include, based
- # on user context
- # excluded_subdirs (list): The optional list of subdirs to not
- # pull, referenced against subdir value
- #
- def __pull_directories(self):
- context = self._get_context()
-
- # Current default exclusions on pull
- excluded_subdirs = ["buildtree"]
- subdir = ''
-
- # If buildtrees are to be pulled, remove the value from exclusion list
- # and set specific subdir
- if context.pull_buildtrees:
- subdir = "buildtree"
- excluded_subdirs.remove(subdir)
-
- # If file contents are not required for this element, don't pull them.
- # The directories themselves will always be pulled.
- if not context.require_artifact_files and not self._artifact_files_required():
- excluded_subdirs.append("files")
-
- return (subdir, excluded_subdirs)
-
# __cache_sources():
#
# Caches the sources into the local CAS
@@ -3017,3 +2964,42 @@ def _overlap_error_detail(f, forbidden_overlap_elements, elements):
" above ".join(reversed(elements))))
else:
return ""
+
+
+# _get_normal_name():
+#
+# Get the element name without path separators or
+# the extension.
+#
+# Args:
+# element_name (str): The element's name
+#
+# Returns:
+# (str): The normalised element name
+#
+def _get_normal_name(element_name):
+ return os.path.splitext(element_name.replace(os.sep, '-'))[0]
+
+
+# _compose_artifact_name():
+#
+# Compose the completely resolved 'artifact_name' as a filepath
+#
+# Args:
+# project_name (str): The project's name
+# normal_name (str): The element's normalised name
+# cache_key (str): The relevant cache key
+#
+# Returns:
+# (str): The constructed artifact name path
+#
+def _compose_artifact_name(project_name, normal_name, cache_key):
+ valid_chars = string.digits + string.ascii_letters + '-._'
+ normal_name = ''.join([
+ x if x in valid_chars else '_'
+ for x in normal_name
+ ])
+
+ # Note that project names are not allowed to contain slashes. Element names containing
+ # a '/' will have this replaced with a '-' upon Element object instantiation.
+ return '{0}/{1}/{2}'.format(project_name, normal_name, cache_key)
diff --git a/buildstream/testing/runcli.py b/buildstream/testing/runcli.py
index 934c31236..8b3185143 100644
--- a/buildstream/testing/runcli.py
+++ b/buildstream/testing/runcli.py
@@ -53,9 +53,11 @@ from _pytest.capture import MultiCapture, FDCapture, FDCaptureBinary
from buildstream._frontend import cli as bst_cli
from buildstream import _yaml
from buildstream._cas import CASCache
+from buildstream.element import _get_normal_name, _compose_artifact_name
# Special private exception accessor, for test case purposes
from buildstream._exceptions import BstError, get_last_exception, get_last_task_error
+from buildstream._protos.buildstream.v2 import artifact_pb2
# Wrapper for the click.testing result
@@ -495,6 +497,17 @@ class Cli():
result.assert_success()
return result.output.splitlines()
+ # Fetch an element's complete artifact name, cache_key will be generated
+ # if not given.
+ #
+ def get_artifact_name(self, project, project_name, element_name, cache_key=None):
+ if not cache_key:
+ cache_key = self.get_element_key(project, element_name)
+
+ # Replace path separator and chop off the .bst suffix for normal name
+ normal_name = _get_normal_name(element_name)
+ return _compose_artifact_name(project_name, normal_name, cache_key)
+
class CliIntegration(Cli):
@@ -636,7 +649,8 @@ class TestArtifact():
#
def remove_artifact_from_cache(self, cache_dir, element_name):
- cache_dir = os.path.join(cache_dir, 'cas', 'refs', 'heads')
+ cache_dir = os.path.join(cache_dir, 'artifacts', 'refs')
+
normal_name = element_name.replace(os.sep, '-')
cache_dir = os.path.splitext(os.path.join(cache_dir, 'test', normal_name))[0]
shutil.rmtree(cache_dir)
@@ -655,13 +669,13 @@ class TestArtifact():
#
def is_cached(self, cache_dir, element, element_key):
- cas = CASCache(str(cache_dir))
+ # cas = CASCache(str(cache_dir))
artifact_ref = element.get_artifact_name(element_key)
- return cas.contains(artifact_ref)
+ return os.path.exists(os.path.join(cache_dir, 'artifacts', 'refs', artifact_ref))
# get_digest():
#
- # Get the digest for a given element's artifact
+ # Get the digest for a given element's artifact files
#
# Args:
# cache_dir (str): Specific cache dir to check
@@ -673,10 +687,12 @@ class TestArtifact():
#
def get_digest(self, cache_dir, element, element_key):
- cas = CASCache(str(cache_dir))
artifact_ref = element.get_artifact_name(element_key)
- digest = cas.resolve_ref(artifact_ref)
- return digest
+ artifact_dir = os.path.join(cache_dir, 'artifacts', 'refs')
+ artifact_proto = artifact_pb2.Artifact()
+ with open(os.path.join(artifact_dir, artifact_ref), 'rb') as f:
+ artifact_proto.ParseFromString(f.read())
+ return artifact_proto.files
# extract_buildtree():
#
@@ -691,9 +707,19 @@ class TestArtifact():
# (str): path to extracted buildtree directory, does not guarantee
# existence.
@contextmanager
- def extract_buildtree(self, tmpdir, digest):
- with self._extract_subdirectory(tmpdir, digest, 'buildtree') as extract:
- yield extract
+ def extract_buildtree(self, cache_dir, tmpdir, ref):
+ artifact = artifact_pb2.Artifact()
+ try:
+ with open(os.path.join(cache_dir, 'artifacts', 'refs', ref), 'rb') as f:
+ artifact.ParseFromString(f.read())
+ except FileNotFoundError:
+ yield None
+ else:
+ if str(artifact.buildtree):
+ with self._extract_subdirectory(tmpdir, artifact.buildtree) as f:
+ yield f
+ else:
+ yield None
# _extract_subdirectory():
#
@@ -709,12 +735,12 @@ class TestArtifact():
# (str): path to extracted subdir directory, does not guarantee
# existence.
@contextmanager
- def _extract_subdirectory(self, tmpdir, digest, subdir):
+ def _extract_subdirectory(self, tmpdir, digest):
with tempfile.TemporaryDirectory() as extractdir:
try:
cas = CASCache(str(tmpdir))
cas.checkout(extractdir, digest)
- yield os.path.join(extractdir, subdir)
+ yield extractdir
except FileNotFoundError:
yield None