diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-05-15 11:44:45 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-05-15 11:44:45 +0000 |
commit | 7fb538647ccb9ed7b42a60f4b663b10c40dcc772 (patch) | |
tree | 2cb482318b76815cb096fc6d0ff43439ebc70a7b /buildstream | |
parent | 2f4023f3555f588aa8fc70494b08bf95b3db6463 (diff) | |
parent | 1de222e9e8f27dca71a9f078bc82885923c0bcba (diff) | |
download | buildstream-7fb538647ccb9ed7b42a60f4b663b10c40dcc772.tar.gz |
Merge branch 'raoul/974-Artifact-Rework' into 'master'
Artifact as a Proto: rework
Closes #974
See merge request BuildStream/buildstream!1292
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_artifact.py | 311 | ||||
-rw-r--r-- | buildstream/_artifactcache.py | 336 | ||||
-rw-r--r-- | buildstream/_basecache.py | 31 | ||||
-rw-r--r-- | buildstream/_cas/cascache.py | 389 | ||||
-rw-r--r-- | buildstream/_cas/casserver.py | 23 | ||||
-rw-r--r-- | buildstream/_context.py | 4 | ||||
-rw-r--r-- | buildstream/_sourcecache.py | 45 | ||||
-rw-r--r-- | buildstream/_stream.py | 4 | ||||
-rw-r--r-- | buildstream/element.py | 130 | ||||
-rw-r--r-- | buildstream/testing/runcli.py | 50 |
10 files changed, 745 insertions, 578 deletions
diff --git a/buildstream/_artifact.py b/buildstream/_artifact.py index 6cf51ee2d..2240300c7 100644 --- a/buildstream/_artifact.py +++ b/buildstream/_artifact.py @@ -29,11 +29,11 @@ artifact composite interaction away from Element class """ import os -import shutil +import tempfile +from ._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto from . import _yaml from . import utils -from ._exceptions import ArtifactError from .types import Scope from .storage._casbaseddirectory import CasBasedDirectory @@ -49,12 +49,18 @@ from .storage._casbaseddirectory import CasBasedDirectory # class Artifact(): + version = 0 + def __init__(self, element, context, *, strong_key=None, weak_key=None): self._element = element self._context = context self._artifacts = context.artifactcache self._cache_key = strong_key self._weak_cache_key = weak_key + self._artifactdir = context.artifactdir + self._cas = context.get_cascache() + self._tmpdir = context.tmpdir + self._proto = None self._metadata_keys = None # Strong and weak key tuple extracted from the artifact self._metadata_dependencies = None # Dictionary of dependency strong keys from the artifact @@ -69,7 +75,9 @@ class Artifact(): # (Directory): The virtual directory object # def get_files(self): - return self._get_subdirectory("files") + files_digest = self._get_field_digest("files") + + return CasBasedDirectory(self._cas, digest=files_digest) # get_buildtree(): # @@ -79,7 +87,9 @@ class Artifact(): # (Directory): The virtual directory object # def get_buildtree(self): - return self._get_subdirectory("buildtree") + buildtree_digest = self._get_field_digest("buildtree") + + return CasBasedDirectory(self._cas, digest=buildtree_digest) # get_extract_key(): # @@ -100,7 +110,6 @@ class Artifact(): # sandbox_build_dir (Directory): Virtual Directory object for the sandbox build-root # collectvdir (Directory): Virtual Directoy object from within the sandbox for collection # buildresult (tuple): bool, short desc and detailed desc of result - # keys (list): list of keys for the artifact commit metadata # publicdata (dict): dict of public data to commit to artifact metadata # # Returns: @@ -110,80 +119,78 @@ class Artifact(): context = self._context element = self._element + size = 0 - assemblevdir = CasBasedDirectory(cas_cache=self._artifacts.cas) - logsvdir = assemblevdir.descend("logs", create=True) - metavdir = assemblevdir.descend("meta", create=True) + filesvdir = None + buildtreevdir = None - # Create artifact directory structure - assembledir = os.path.join(rootdir, 'artifact') - logsdir = os.path.join(assembledir, 'logs') - metadir = os.path.join(assembledir, 'meta') - os.mkdir(assembledir) - os.mkdir(logsdir) - os.mkdir(metadir) + artifact = ArtifactProto() - if collectvdir is not None: - filesvdir = assemblevdir.descend("files", create=True) - filesvdir.import_files(collectvdir) + artifact.version = self.version - if sandbox_build_dir: - buildtreevdir = assemblevdir.descend("buildtree", create=True) - buildtreevdir.import_files(sandbox_build_dir) + # Store result + artifact.build_success = buildresult[0] + artifact.build_error = buildresult[1] + artifact.build_error_details = "" if not buildresult[2] else buildresult[2] - # Write some logs out to normal directories: logsdir and metadir - # Copy build log - log_filename = context.get_log_filename() - element._build_log_path = os.path.join(logsdir, 'build.log') - if log_filename: - shutil.copyfile(log_filename, element._build_log_path) + # Store keys + artifact.strong_key = self._cache_key + artifact.weak_key = self._weak_cache_key + + artifact.was_workspaced = bool(element._get_workspace()) + + # Store files + if collectvdir: + filesvdir = CasBasedDirectory(cas_cache=self._cas) + filesvdir.import_files(collectvdir) + artifact.files.CopyFrom(filesvdir._get_digest()) + size += filesvdir.get_size() # Store public data - _yaml.dump(_yaml.node_sanitize(publicdata), os.path.join(metadir, 'public.yaml')) + with tempfile.NamedTemporaryFile(dir=self._tmpdir) as tmp: + _yaml.dump(_yaml.node_sanitize(publicdata), tmp.name) + public_data_digest = self._cas.add_object(path=tmp.name, link_directly=True) + artifact.public_data.CopyFrom(public_data_digest) + size += public_data_digest.size_bytes + + # store build dependencies + for e in element.dependencies(Scope.BUILD): + new_build = artifact.build_deps.add() + new_build.element_name = e.name + new_build.cache_key = e._get_cache_key() + new_build.was_workspaced = bool(e._get_workspace()) + + # Store log file + log_filename = context.get_log_filename() + if log_filename: + digest = self._cas.add_object(path=log_filename) + element._build_log_path = self._cas.objpath(digest) + log = artifact.logs.add() + log.name = os.path.basename(log_filename) + log.digest.CopyFrom(digest) + size += log.digest.size_bytes + + # Store build tree + if sandbox_build_dir: + buildtreevdir = CasBasedDirectory(cas_cache=self._cas) + buildtreevdir.import_files(sandbox_build_dir) + artifact.buildtree.CopyFrom(buildtreevdir._get_digest()) + size += buildtreevdir.get_size() - # Store result - build_result_dict = {"success": buildresult[0], "description": buildresult[1]} - if buildresult[2] is not None: - build_result_dict["detail"] = buildresult[2] - _yaml.dump(build_result_dict, os.path.join(metadir, 'build-result.yaml')) - - # Store keys.yaml - _yaml.dump(_yaml.node_sanitize({ - 'strong': self._cache_key, - 'weak': self._weak_cache_key, - }), os.path.join(metadir, 'keys.yaml')) - - # Store dependencies.yaml - _yaml.dump(_yaml.node_sanitize({ - e.name: e._get_cache_key() for e in element.dependencies(Scope.BUILD) - }), os.path.join(metadir, 'dependencies.yaml')) - - # Store workspaced.yaml - _yaml.dump(_yaml.node_sanitize({ - 'workspaced': bool(element._get_workspace()) - }), os.path.join(metadir, 'workspaced.yaml')) - - # Store workspaced-dependencies.yaml - _yaml.dump(_yaml.node_sanitize({ - 'workspaced-dependencies': [ - e.name for e in element.dependencies(Scope.BUILD) - if e._get_workspace() - ] - }), os.path.join(metadir, 'workspaced-dependencies.yaml')) - - metavdir.import_files(metadir) - logsvdir.import_files(logsdir) - - artifact_size = assemblevdir.get_size() + os.makedirs(os.path.dirname(os.path.join( + self._artifactdir, element.get_artifact_name())), exist_ok=True) keys = utils._deduplicate([self._cache_key, self._weak_cache_key]) - self._artifacts.commit(element, assemblevdir, keys) + for key in keys: + path = os.path.join(self._artifactdir, element.get_artifact_name(key=key)) + with open(path, mode='w+b') as f: + f.write(artifact.SerializeToString()) - return artifact_size + return size # cached_buildtree() # # Check if artifact is cached with expected buildtree. A - # buildtree will not be present if the res tof the partial artifact + # buildtree will not be present if the rest of the partial artifact # is not cached. # # Returns: @@ -193,14 +200,12 @@ class Artifact(): # def cached_buildtree(self): - element = self._element - - key = self.get_extract_key() - if not self._artifacts.contains_subdir_artifact(element, key, 'buildtree'): + buildtree_digest = self._get_field_digest("buildtree") + if buildtree_digest: + return self._cas.contains_directory(buildtree_digest, with_files=True) + else: return False - return True - # buildtree_exists() # # Check if artifact was created with a buildtree. This does not check @@ -211,8 +216,8 @@ class Artifact(): # def buildtree_exists(self): - artifact_vdir = self._get_directory() - return artifact_vdir._exists('buildtree') + artifact = self._get_proto() + return bool(str(artifact.buildtree)) # load_public_data(): # @@ -224,8 +229,8 @@ class Artifact(): def load_public_data(self): # Load the public data from the artifact - meta_vdir = self._get_subdirectory('meta') - meta_file = meta_vdir._objpath('public.yaml') + artifact = self._get_proto() + meta_file = self._cas.objpath(artifact.public_data) data = _yaml.load(meta_file, shortname='public.yaml') return data @@ -241,20 +246,10 @@ class Artifact(): # def load_build_result(self): - meta_vdir = self._get_subdirectory('meta') - - meta_file = meta_vdir._objpath('build-result.yaml') - if not os.path.exists(meta_file): - build_result = (True, "succeeded", None) - return build_result - - data = _yaml.load(meta_file, shortname='build-result.yaml') - - success = _yaml.node_get(data, bool, 'success') - description = _yaml.node_get(data, str, 'description', default_value=None) - detail = _yaml.node_get(data, str, 'detail', default_value=None) - - build_result = (success, description, detail) + artifact = self._get_proto() + build_result = (artifact.build_success, + artifact.build_error, + artifact.build_error_details) return build_result @@ -271,14 +266,11 @@ class Artifact(): if self._metadata_keys is not None: return self._metadata_keys - # Extract the metadata dir - meta_vdir = self._get_subdirectory('meta') + # Extract proto + artifact = self._get_proto() - # Parse the expensive yaml now and cache the result - meta_file = meta_vdir._objpath('keys.yaml') - meta = _yaml.load(meta_file, shortname='keys.yaml') - strong_key = _yaml.node_get(meta, str, 'strong') - weak_key = _yaml.node_get(meta, str, 'weak') + strong_key = artifact.strong_key + weak_key = artifact.weak_key self._metadata_keys = (strong_key, weak_key) @@ -296,14 +288,10 @@ class Artifact(): if self._metadata_dependencies is not None: return self._metadata_dependencies - # Extract the metadata dir - meta_vdir = self._get_subdirectory('meta') - - # Parse the expensive yaml now and cache the result - meta_file = meta_vdir._objpath('dependencies.yaml') - meta = _yaml.load(meta_file, shortname='dependencies.yaml') + # Extract proto + artifact = self._get_proto() - self._metadata_dependencies = meta + self._metadata_dependencies = {dep.element_name: dep.cache_key for dep in artifact.build_deps} return self._metadata_dependencies @@ -319,14 +307,10 @@ class Artifact(): if self._metadata_workspaced is not None: return self._metadata_workspaced - # Extract the metadata dir - meta_vdir = self._get_subdirectory('meta') - - # Parse the expensive yaml now and cache the result - meta_file = meta_vdir._objpath('workspaced.yaml') - meta = _yaml.load(meta_file, shortname='workspaced.yaml') + # Extract proto + artifact = self._get_proto() - self._metadata_workspaced = _yaml.node_get(meta, bool, 'workspaced') + self._metadata_workspaced = artifact.was_workspaced return self._metadata_workspaced @@ -342,15 +326,11 @@ class Artifact(): if self._metadata_workspaced_dependencies is not None: return self._metadata_workspaced_dependencies - # Extract the metadata dir - meta_vdir = self._get_subdirectory('meta') + # Extract proto + artifact = self._get_proto() - # Parse the expensive yaml now and cache the result - meta_file = meta_vdir._objpath('workspaced-dependencies.yaml') - meta = _yaml.load(meta_file, shortname='workspaced-dependencies.yaml') - - self._metadata_workspaced_dependencies = _yaml.node_sanitize(_yaml.node_get(meta, list, - 'workspaced-dependencies')) + self._metadata_workspaced_dependencies = [dep.element_name for dep in artifact.build_deps + if dep.was_workspaced] return self._metadata_workspaced_dependencies @@ -369,30 +349,21 @@ class Artifact(): def cached(self): context = self._context - try: - vdir = self._get_directory() - except ArtifactError: - # Either ref or top-level artifact directory missing - return False + artifact = self._get_proto() - # Check whether all metadata is available - metadigest = vdir._get_child_digest('meta') - if not self._artifacts.cas.contains_directory(metadigest, with_files=True): + if not artifact: return False - # Additional checks only relevant if artifact was created with 'files' subdirectory - if vdir._exists('files'): - # Determine whether directories are required - require_directories = context.require_artifact_directories - # Determine whether file contents are required as well - require_files = context.require_artifact_files or self._element._artifact_files_required() + # Determine whether directories are required + require_directories = context.require_artifact_directories + # Determine whether file contents are required as well + require_files = (context.require_artifact_files or + self._element._artifact_files_required()) - filesdigest = vdir._get_child_digest('files') - - # Check whether 'files' subdirectory is available, with or without file contents - if (require_directories and - not self._artifacts.cas.contains_directory(filesdigest, with_files=require_files)): - return False + # Check whether 'files' subdirectory is available, with or without file contents + if (require_directories and str(artifact.files) and + not self._cas.contains_directory(artifact.files, with_files=require_files)): + return False return True @@ -408,46 +379,50 @@ class Artifact(): if not self._element._cached(): return False - log_vdir = self._get_subdirectory('logs') + artifact = self._get_proto() - logsdigest = log_vdir._get_digest() - return self._artifacts.cas.contains_directory(logsdigest, with_files=True) + for logfile in artifact.logs: + if not self._cas.contains(logfile.digest.hash): + return False - # _get_directory(): - # - # Get a virtual directory for the artifact contents - # - # Args: - # key (str): The key for the artifact to extract, - # or None for the default key + return True + + # _get_proto() # # Returns: - # (Directory): The virtual directory object + # (Artifact): Artifact proto # - def _get_directory(self, key=None): + def _get_proto(self): + # Check if we've already cached the proto object + if self._proto is not None: + return self._proto - element = self._element + key = self.get_extract_key() - if key is None: - key = self.get_extract_key() + proto_path = os.path.join(self._artifactdir, + self._element.get_artifact_name(key=key)) + artifact = ArtifactProto() + try: + with open(proto_path, mode='r+b') as f: + artifact.ParseFromString(f.read()) + except FileNotFoundError: + return None - return self._artifacts.get_artifact_directory(element, key) + os.utime(proto_path) + # Cache the proto object + self._proto = artifact - # _get_subdirectory(): - # - # Get a virtual directory for the artifact subdir contents - # - # Args: - # subdir (str): The specific artifact subdir - # key (str): The key for the artifact to extract, - # or None for the default key + return self._proto + + # _get_artifact_field() # # Returns: - # (Directory): The virtual subdirectory object + # (Digest): Digest of field specified # - def _get_subdirectory(self, subdir, key=None): - - artifact_vdir = self._get_directory(key) - sub_vdir = artifact_vdir.descend(subdir) + def _get_field_digest(self, field): + artifact_proto = self._get_proto() + digest = getattr(artifact_proto, field) + if not str(digest): + return None - return sub_vdir + return digest diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index 5b0ccacc4..091b44dda 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -17,13 +17,18 @@ # Authors: # Tristan Maat <tristan.maat@codethink.co.uk> +import os +import grpc + from ._basecache import BaseCache from .types import _KeyStrength -from ._exceptions import ArtifactError, CASCacheError, CASError +from ._exceptions import ArtifactError, CASError, CASCacheError +from ._protos.buildstream.v2 import artifact_pb2, artifact_pb2_grpc from ._cas import CASRemoteSpec from .storage._casbaseddirectory import CasBasedDirectory -from .storage.directory import VirtualDirectoryError +from ._artifact import Artifact +from . import utils # An ArtifactCacheSpec holds the user configuration for a single remote @@ -55,8 +60,15 @@ class ArtifactCache(BaseCache): self._required_elements = set() # The elements required for this session - self.casquota.add_ref_callbacks(self.required_artifacts) - self.casquota.add_remove_callbacks((lambda x: not x.startswith('@'), self.remove)) + # create artifact directory + self.artifactdir = context.artifactdir + os.makedirs(self.artifactdir, exist_ok=True) + + self.casquota.add_remove_callbacks(self.unrequired_artifacts, self.remove) + self.casquota.add_list_refs_callback(self.list_artifacts) + + self.cas.add_reachable_directories_callback(self._reachable_directories) + self.cas.add_reachable_digests_callback(self._reachable_digests) # mark_required_elements(): # @@ -92,13 +104,34 @@ class ArtifactCache(BaseCache): weak_key = element._get_cache_key(strength=_KeyStrength.WEAK) for key in (strong_key, weak_key): if key: - try: - ref = element.get_artifact_name(key) + ref = element.get_artifact_name(key) - self.cas.update_mtime(ref) - except CASError: + try: + self.update_mtime(ref) + except ArtifactError: pass + def update_mtime(self, ref): + try: + os.utime(os.path.join(self.artifactdir, ref)) + except FileNotFoundError as e: + raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e + + # unrequired_artifacts() + # + # Returns iterator over artifacts that are not required in the build plan + # + # Returns: + # (iter): Iterator over tuples of (float, str) where float is the time + # and str is the artifact ref + # + def unrequired_artifacts(self): + required_artifacts = set(map(lambda x: x.get_artifact_name(), + self._required_elements)) + for (mtime, artifact) in self._list_refs_mtimes(self.artifactdir): + if artifact not in required_artifacts: + yield (mtime, artifact) + def required_artifacts(self): # Build a set of the cache keys which are required # based on the required elements at cleanup time @@ -147,24 +180,7 @@ class ArtifactCache(BaseCache): def contains(self, element, key): ref = element.get_artifact_name(key) - return self.cas.contains(ref) - - # contains_subdir_artifact(): - # - # Check whether an artifact element contains a digest for a subdir - # which is populated in the cache, i.e non dangling. - # - # Args: - # element (Element): The Element to check - # key (str): The cache key to use - # subdir (str): The subdir to check - # with_files (bool): Whether to check files as well - # - # Returns: True if the subdir exists & is populated in the cache, False otherwise - # - def contains_subdir_artifact(self, element, key, subdir, *, with_files=True): - ref = element.get_artifact_name(key) - return self.cas.contains_subdir_artifact(ref, subdir, with_files=with_files) + return os.path.exists(os.path.join(self.artifactdir, ref)) # list_artifacts(): # @@ -177,9 +193,7 @@ class ArtifactCache(BaseCache): # ([str]) - A list of artifact names as generated in LRU order # def list_artifacts(self, *, glob=None): - return list(filter( - lambda x: not x.startswith('@'), - self.cas.list_refs(glob=glob))) + return [ref for _, ref in sorted(list(self._list_refs_mtimes(self.artifactdir, glob_expr=glob)))] # remove(): # @@ -196,7 +210,10 @@ class ArtifactCache(BaseCache): # (int): The amount of space recovered in the cache, in bytes # def remove(self, ref, *, defer_prune=False): - return self.cas.remove(ref, defer_prune=defer_prune) + try: + return self.cas.remove(ref, basedir=self.artifactdir, defer_prune=defer_prune) + except CASCacheError as e: + raise ArtifactError("{}".format(e)) from e # prune(): # @@ -205,63 +222,31 @@ class ArtifactCache(BaseCache): def prune(self): return self.cas.prune() - # get_artifact_directory(): - # - # Get virtual directory for cached artifact of the specified Element. - # - # Assumes artifact has previously been fetched or committed. - # - # Args: - # element (Element): The Element to extract - # key (str): The cache key to use - # - # Raises: - # ArtifactError: In cases there was an OSError, or if the artifact - # did not exist. - # - # Returns: virtual directory object - # - def get_artifact_directory(self, element, key): - ref = element.get_artifact_name(key) - try: - digest = self.cas.resolve_ref(ref, update_mtime=True) - return CasBasedDirectory(self.cas, digest=digest) - except (CASCacheError, VirtualDirectoryError) as e: - raise ArtifactError('Directory not in local cache: {}'.format(e)) from e - - # commit(): - # - # Commit built artifact to cache. - # - # Args: - # element (Element): The Element commit an artifact for - # content (Directory): The element's content directory - # keys (list): The cache keys to use - # - def commit(self, element, content, keys): - refs = [element.get_artifact_name(key) for key in keys] - - tree = content._get_digest() - - for ref in refs: - self.cas.set_ref(ref, tree) - # diff(): # # Return a list of files that have been added or modified between - # the artifacts described by key_a and key_b. + # the artifacts described by key_a and key_b. This expects the + # provided keys to be strong cache keys # # Args: # element (Element): The element whose artifacts to compare - # key_a (str): The first artifact key - # key_b (str): The second artifact key - # subdir (str): A subdirectory to limit the comparison to + # key_a (str): The first artifact strong key + # key_b (str): The second artifact strong key # - def diff(self, element, key_a, key_b, *, subdir=None): - ref_a = element.get_artifact_name(key_a) - ref_b = element.get_artifact_name(key_b) + def diff(self, element, key_a, key_b): + context = self.context + artifact_a = Artifact(element, context, strong_key=key_a) + artifact_b = Artifact(element, context, strong_key=key_b) + digest_a = artifact_a._get_proto().files + digest_b = artifact_b._get_proto().files - return self.cas.diff(ref_a, ref_b, subdir=subdir) + added = [] + removed = [] + modified = [] + + self.cas.diff_trees(digest_a, digest_b, added=added, removed=removed, modified=modified) + + return modified, removed, added # push(): # @@ -269,7 +254,7 @@ class ArtifactCache(BaseCache): # # Args: # element (Element): The Element whose artifact is to be pushed - # keys (list): The cache keys to use + # artifact (Artifact): The artifact being pushed # # Returns: # (bool): True if any remote was updated, False if no pushes were required @@ -277,9 +262,7 @@ class ArtifactCache(BaseCache): # Raises: # (ArtifactError): if there was an error # - def push(self, element, keys): - refs = [element.get_artifact_name(key) for key in list(keys)] - + def push(self, element, artifact): project = element._get_project() push_remotes = [r for r in self._remotes[project] if r.spec.push] @@ -291,7 +274,7 @@ class ArtifactCache(BaseCache): display_key = element._get_brief_display_key() element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) - if self.cas.push(refs, remote): + if self._push_artifact(element, artifact, remote): element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) pushed = True else: @@ -308,24 +291,21 @@ class ArtifactCache(BaseCache): # Args: # element (Element): The Element whose artifact is to be fetched # key (str): The cache key to use - # progress (callable): The progress callback, if any - # subdir (str): The optional specific subdir to pull - # excluded_subdirs (list): The optional list of subdirs to not pull + # pull_buildtrees (bool): Whether to pull buildtrees or not # # Returns: # (bool): True if pull was successful, False if artifact was not available # - def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None): - ref = element.get_artifact_name(key) + def pull(self, element, key, *, pull_buildtrees=False): display_key = key[:self.context.log_key_length] - project = element._get_project() for remote in self._remotes[project]: + remote.init() try: element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) - if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs): + if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees): element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) # no need to pull from additional remotes return True @@ -399,7 +379,9 @@ class ArtifactCache(BaseCache): oldref = element.get_artifact_name(oldkey) newref = element.get_artifact_name(newkey) - self.cas.link_ref(oldref, newref) + if not os.path.exists(os.path.join(self.artifactdir, newref)): + os.link(os.path.join(self.artifactdir, oldref), + os.path.join(self.artifactdir, newref)) # get_artifact_logs(): # @@ -463,3 +445,173 @@ class ArtifactCache(BaseCache): remote_missing_blobs_set.update(remote_missing_blobs) return list(remote_missing_blobs_set) + + ################################################ + # Local Private Methods # + ################################################ + + # _reachable_directories() + # + # Returns: + # (iter): Iterator over directories digests available from artifacts. + # + def _reachable_directories(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.files): + yield artifact.files + + if str(artifact.buildtree): + yield artifact.buildtree + + # _reachable_digests() + # + # Returns: + # (iter): Iterator over single file digests in artifacts + # + def _reachable_digests(self): + for root, _, files in os.walk(self.artifactdir): + for artifact_file in files: + artifact = artifact_pb2.Artifact() + with open(os.path.join(root, artifact_file), 'r+b') as f: + artifact.ParseFromString(f.read()) + + if str(artifact.public_data): + yield artifact.public_data + + for log_file in artifact.logs: + yield log_file.digest + + # _push_artifact() + # + # Pushes relevant directories and then artifact proto to remote. + # + # Args: + # element (Element): The element + # artifact (Artifact): The related artifact being pushed + # remote (CASRemote): Remote to push to + # + # Returns: + # (bool): whether the push was successful + # + def _push_artifact(self, element, artifact, remote): + + artifact_proto = artifact._get_proto() + + keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key])) + + # Check whether the artifact is on the server + present = False + for key in keys: + get_artifact = artifact_pb2.GetArtifactRequest() + get_artifact.cache_key = element.get_artifact_name(key) + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact_service.GetArtifact(get_artifact) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Error checking artifact cache: {}" + .format(e.details())) + else: + present = True + if present: + return False + + try: + self.cas._send_directory(remote, artifact_proto.files) + + if str(artifact_proto.buildtree): + try: + self.cas._send_directory(remote, artifact_proto.buildtree) + except FileNotFoundError: + pass + + digests = [] + if str(artifact_proto.public_data): + digests.append(artifact_proto.public_data) + + for log_file in artifact_proto.logs: + digests.append(log_file.digest) + + self.cas.send_blobs(remote, digests) + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise ArtifactError("Failed to push artifact blobs: {}".format(e.details())) + return False + + # finally need to send the artifact proto + for key in keys: + update_artifact = artifact_pb2.UpdateArtifactRequest() + update_artifact.cache_key = element.get_artifact_name(key) + update_artifact.artifact.CopyFrom(artifact_proto) + + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact_service.UpdateArtifact(update_artifact) + except grpc.RpcError as e: + raise ArtifactError("Failed to push artifact: {}".format(e.details())) + + return True + + # _pull_artifact() + # + # Args: + # element (Element): element to pull + # key (str): specific key of element to pull + # remote (CASRemote): remote to pull from + # pull_buildtree (bool): whether to pull buildtrees or not + # + # Returns: + # (bool): whether the pull was successful + # + def _pull_artifact(self, element, key, remote, pull_buildtrees=False): + + def __pull_digest(digest): + self.cas._fetch_directory(remote, digest) + required_blobs = self.cas.required_blobs_for_directory(digest) + missing_blobs = self.cas.local_missing_blobs(required_blobs) + if missing_blobs: + self.cas.fetch_blobs(remote, missing_blobs) + + request = artifact_pb2.GetArtifactRequest() + request.cache_key = element.get_artifact_name(key=key) + try: + artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel) + artifact = artifact_service.GetArtifact(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return False + + try: + if str(artifact.files): + __pull_digest(artifact.files) + + if pull_buildtrees and str(artifact.buildtree): + __pull_digest(artifact.buildtree) + + digests = [] + if str(artifact.public_data): + digests.append(artifact.public_data) + + for log_digest in artifact.logs: + digests.append(log_digest.digest) + + self.cas.fetch_blobs(remote, digests) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise ArtifactError("Failed to pull artifact: {}".format(e.details())) + return False + + # Write the artifact proto to cache + artifact_path = os.path.join(self.artifactdir, request.cache_key) + os.makedirs(os.path.dirname(artifact_path), exist_ok=True) + with open(artifact_path, 'w+b') as f: + f.write(artifact.SerializeToString()) + + return True diff --git a/buildstream/_basecache.py b/buildstream/_basecache.py index af3fe9bb7..68654b2a0 100644 --- a/buildstream/_basecache.py +++ b/buildstream/_basecache.py @@ -17,6 +17,8 @@ # Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> # import multiprocessing +import os +from fnmatch import fnmatch from . import utils from . import _yaml @@ -274,3 +276,32 @@ class BaseCache(): with self.context.timed_activity("Initializing remote caches", silent_nested=True): self.initialize_remotes(on_failure=remote_failed) + + # _list_refs_mtimes() + # + # List refs in a directory, given a base path. Also returns the + # associated mtimes + # + # Args: + # base_path (str): Base path to traverse over + # glob_expr (str|None): Optional glob expression to match against files + # + # Returns: + # (iter (mtime, filename)]): iterator of tuples of mtime and refs + # + def _list_refs_mtimes(self, base_path, *, glob_expr=None): + path = base_path + if glob_expr is not None: + globdir = os.path.dirname(glob_expr) + if not any(c in "*?[" for c in globdir): + # path prefix contains no globbing characters so + # append the glob to optimise the os.walk() + path = os.path.join(base_path, globdir) + + for root, _, files in os.walk(path): + for filename in files: + ref_path = os.path.join(root, filename) + relative_path = os.path.relpath(ref_path, base_path) # Relative to refs head + if not glob_expr or fnmatch(relative_path, glob_expr): + # Obtain the mtime (the time a file was last modified) + yield (os.path.getmtime(ref_path), relative_path) diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py index 5f67dc0c1..ad8013d18 100644 --- a/buildstream/_cas/cascache.py +++ b/buildstream/_cas/cascache.py @@ -24,7 +24,6 @@ import stat import errno import uuid import contextlib -from fnmatch import fnmatch import grpc @@ -89,6 +88,9 @@ class CASCache(): os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True) + self.__reachable_directory_callbacks = [] + self.__reachable_digest_callbacks = [] + # preflight(): # # Preflight check. @@ -114,28 +116,6 @@ class CASCache(): # This assumes that the repository doesn't have any dangling pointers return os.path.exists(refpath) - # contains_subdir_artifact(): - # - # Check whether the specified artifact element tree has a digest for a subdir - # which is populated in the cache, i.e non dangling. - # - # Args: - # ref (str): The ref to check - # subdir (str): The subdir to check - # with_files (bool): Whether to check files as well - # - # Returns: True if the subdir exists & is populated in the cache, False otherwise - # - def contains_subdir_artifact(self, ref, subdir, *, with_files=True): - tree = self.resolve_ref(ref) - - try: - subdirdigest = self._get_subdir(tree, subdir) - - return self.contains_directory(subdirdigest, with_files=with_files) - except (CASCacheError, FileNotFoundError): - return False - # contains_directory(): # # Check whether the specified directory and subdirecotires are in the cache, @@ -230,19 +210,15 @@ class CASCache(): # ref_b (str): The second ref # subdir (str): A subdirectory to limit the comparison to # - def diff(self, ref_a, ref_b, *, subdir=None): + def diff(self, ref_a, ref_b): tree_a = self.resolve_ref(ref_a) tree_b = self.resolve_ref(ref_b) - if subdir: - tree_a = self._get_subdir(tree_a, subdir) - tree_b = self._get_subdir(tree_b, subdir) - added = [] removed = [] modified = [] - self._diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified) + self.diff_trees(tree_a, tree_b, added=added, removed=removed, modified=modified) return modified, removed, added @@ -253,14 +229,11 @@ class CASCache(): # Args: # ref (str): The ref to pull # remote (CASRemote): The remote repository to pull from - # progress (callable): The progress callback, if any - # subdir (str): The optional specific subdir to pull - # excluded_subdirs (list): The optional list of subdirs to not pull # # Returns: # (bool): True if pull was successful, False if ref was not available # - def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None): + def pull(self, ref, remote): try: remote.init() @@ -274,7 +247,7 @@ class CASCache(): self._fetch_directory(remote, tree) # Fetch files, excluded_subdirs determined in pullqueue - required_blobs = self.required_blobs_for_directory(tree, excluded_subdirs=excluded_subdirs) + required_blobs = self.required_blobs_for_directory(tree) missing_blobs = self.local_missing_blobs(required_blobs) if missing_blobs: self.fetch_blobs(remote, missing_blobs) @@ -502,44 +475,6 @@ class CASCache(): except FileNotFoundError as e: raise CASCacheError("Attempt to access unavailable ref: {}".format(e)) from e - # list_refs(): - # - # List refs in Least Recently Modified (LRM) order. - # - # Args: - # glob (str) - An optional glob expression to be used to list refs satisfying the glob - # - # Returns: - # (list) - A list of refs in LRM order - # - def list_refs(self, *, glob=None): - # string of: /path/to/repo/refs/heads - ref_heads = os.path.join(self.casdir, 'refs', 'heads') - path = ref_heads - - if glob is not None: - globdir = os.path.dirname(glob) - if not any(c in "*?[" for c in globdir): - # path prefix contains no globbing characters so - # append the glob to optimise the os.walk() - path = os.path.join(ref_heads, globdir) - - refs = [] - mtimes = [] - - for root, _, files in os.walk(path): - for filename in files: - ref_path = os.path.join(root, filename) - relative_path = os.path.relpath(ref_path, ref_heads) # Relative to refs head - if not glob or fnmatch(relative_path, glob): - refs.append(relative_path) - # Obtain the mtime (the time a file was last modified) - mtimes.append(os.path.getmtime(ref_path)) - - # NOTE: Sorted will sort from earliest to latest, thus the - # first ref of this list will be the file modified earliest. - return [ref for _, ref in sorted(zip(mtimes, refs))] - # list_objects(): # # List cached objects in Least Recently Modified (LRM) order. @@ -581,6 +516,8 @@ class CASCache(): # # Args: # ref (str): A symbolic ref + # basedir (str): Path of base directory the ref is in, defaults to + # CAS refs heads # defer_prune (bool): Whether to defer pruning to the caller. NOTE: # The space won't be freed until you manually # call prune. @@ -589,10 +526,12 @@ class CASCache(): # (int|None) The amount of space pruned from the repository in # Bytes, or None if defer_prune is True # - def remove(self, ref, *, defer_prune=False): + def remove(self, ref, *, basedir=None, defer_prune=False): + if basedir is None: + basedir = os.path.join(self.casdir, 'refs', 'heads') # Remove cache ref - self._remove_ref(ref) + self._remove_ref(ref, basedir) if not defer_prune: pruned = self.prune() @@ -600,6 +539,14 @@ class CASCache(): return None + # adds callback of iterator over reachable directory digests + def add_reachable_directories_callback(self, callback): + self.__reachable_directory_callbacks.append(callback) + + # adds callbacks of iterator over reachable file digests + def add_reachable_digests_callback(self, callback): + self.__reachable_digest_callbacks.append(callback) + # prune(): # # Prune unreachable objects from the repo. @@ -619,6 +566,16 @@ class CASCache(): tree = self.resolve_ref(ref) self._reachable_refs_dir(reachable, tree) + # check callback directory digests that are reachable + for digest_callback in self.__reachable_directory_callbacks: + for digest in digest_callback(): + self._reachable_refs_dir(reachable, digest) + + # check callback file digests that are reachable + for digest_callback in self.__reachable_digest_callbacks: + for digest in digest_callback(): + reachable.add(digest.hash) + # Prune unreachable objects for root, _, files in os.walk(os.path.join(self.casdir, 'objects')): for filename in files: @@ -717,6 +674,59 @@ class CASCache(): if dirnode.name not in excluded_subdirs: yield from self.required_blobs_for_directory(dirnode.digest) + def diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): + dir_a = remote_execution_pb2.Directory() + dir_b = remote_execution_pb2.Directory() + + if tree_a: + with open(self.objpath(tree_a), 'rb') as f: + dir_a.ParseFromString(f.read()) + if tree_b: + with open(self.objpath(tree_b), 'rb') as f: + dir_b.ParseFromString(f.read()) + + a = 0 + b = 0 + while a < len(dir_a.files) or b < len(dir_b.files): + if b < len(dir_b.files) and (a >= len(dir_a.files) or + dir_a.files[a].name > dir_b.files[b].name): + added.append(os.path.join(path, dir_b.files[b].name)) + b += 1 + elif a < len(dir_a.files) and (b >= len(dir_b.files) or + dir_b.files[b].name > dir_a.files[a].name): + removed.append(os.path.join(path, dir_a.files[a].name)) + a += 1 + else: + # File exists in both directories + if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash: + modified.append(os.path.join(path, dir_a.files[a].name)) + a += 1 + b += 1 + + a = 0 + b = 0 + while a < len(dir_a.directories) or b < len(dir_b.directories): + if b < len(dir_b.directories) and (a >= len(dir_a.directories) or + dir_a.directories[a].name > dir_b.directories[b].name): + self.diff_trees(None, dir_b.directories[b].digest, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_b.directories[b].name)) + b += 1 + elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or + dir_b.directories[b].name > dir_a.directories[a].name): + self.diff_trees(dir_a.directories[a].digest, None, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_a.directories[a].name)) + a += 1 + else: + # Subdirectory exists in both directories + if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash: + self.diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest, + added=added, removed=removed, modified=modified, + path=os.path.join(path, dir_a.directories[a].name)) + a += 1 + b += 1 + ################################################ # Local Private Methods # ################################################ @@ -733,22 +743,24 @@ class CASCache(): # # Args: # ref (str): The ref to remove + # basedir (str): Path of base directory the ref is in # # Raises: # (CASCacheError): If the ref didnt exist, or a system error # occurred while removing it # - def _remove_ref(self, ref): + def _remove_ref(self, ref, basedir): # Remove the ref itself - refpath = self._refpath(ref) + refpath = os.path.join(basedir, ref) + try: os.unlink(refpath) except FileNotFoundError as e: raise CASCacheError("Could not find ref '{}'".format(ref)) from e # Now remove any leading directories - basedir = os.path.join(self.casdir, 'refs', 'heads') + components = list(os.path.split(ref)) while components: components.pop() @@ -831,59 +843,6 @@ class CASCache(): raise CASCacheError("Subdirectory {} not found".format(name)) - def _diff_trees(self, tree_a, tree_b, *, added, removed, modified, path=""): - dir_a = remote_execution_pb2.Directory() - dir_b = remote_execution_pb2.Directory() - - if tree_a: - with open(self.objpath(tree_a), 'rb') as f: - dir_a.ParseFromString(f.read()) - if tree_b: - with open(self.objpath(tree_b), 'rb') as f: - dir_b.ParseFromString(f.read()) - - a = 0 - b = 0 - while a < len(dir_a.files) or b < len(dir_b.files): - if b < len(dir_b.files) and (a >= len(dir_a.files) or - dir_a.files[a].name > dir_b.files[b].name): - added.append(os.path.join(path, dir_b.files[b].name)) - b += 1 - elif a < len(dir_a.files) and (b >= len(dir_b.files) or - dir_b.files[b].name > dir_a.files[a].name): - removed.append(os.path.join(path, dir_a.files[a].name)) - a += 1 - else: - # File exists in both directories - if dir_a.files[a].digest.hash != dir_b.files[b].digest.hash: - modified.append(os.path.join(path, dir_a.files[a].name)) - a += 1 - b += 1 - - a = 0 - b = 0 - while a < len(dir_a.directories) or b < len(dir_b.directories): - if b < len(dir_b.directories) and (a >= len(dir_a.directories) or - dir_a.directories[a].name > dir_b.directories[b].name): - self._diff_trees(None, dir_b.directories[b].digest, - added=added, removed=removed, modified=modified, - path=os.path.join(path, dir_b.directories[b].name)) - b += 1 - elif a < len(dir_a.directories) and (b >= len(dir_b.directories) or - dir_b.directories[b].name > dir_a.directories[a].name): - self._diff_trees(dir_a.directories[a].digest, None, - added=added, removed=removed, modified=modified, - path=os.path.join(path, dir_a.directories[a].name)) - a += 1 - else: - # Subdirectory exists in both directories - if dir_a.directories[a].digest.hash != dir_b.directories[b].digest.hash: - self._diff_trees(dir_a.directories[a].digest, dir_b.directories[b].digest, - added=added, removed=removed, modified=modified, - path=os.path.join(path, dir_a.directories[a].name)) - a += 1 - b += 1 - def _reachable_refs_dir(self, reachable, tree, update_mtime=False, check_exists=False): if tree.hash in reachable: return @@ -1148,8 +1107,8 @@ class CASQuota: self._message = context.message - self._ref_callbacks = [] # Call backs to get required refs - self._remove_callbacks = [] # Call backs to remove refs + self._remove_callbacks = [] # Callbacks to remove unrequired refs and their remove method + self._list_refs_callbacks = [] # Callbacks to all refs self._calculate_cache_quota() @@ -1227,6 +1186,21 @@ class CASQuota: return False + # add_remove_callbacks() + # + # This adds tuples of iterators over unrequired objects (currently + # artifacts and source refs), and a callback to remove them. + # + # Args: + # callback (iter(unrequired), remove): tuple of iterator and remove + # method associated. + # + def add_remove_callbacks(self, list_unrequired, remove_method): + self._remove_callbacks.append((list_unrequired, remove_method)) + + def add_list_refs_callback(self, list_callback): + self._list_refs_callbacks.append(list_callback) + ################################################ # Local Private Methods # ################################################ @@ -1383,28 +1357,25 @@ class CASQuota: removed_ref_count = 0 space_saved = 0 - # get required refs - refs = self.cas.list_refs() - required_refs = set( - required - for callback in self._ref_callbacks - for required in callback() - ) + total_refs = 0 + for refs in self._list_refs_callbacks: + total_refs += len(list(refs())) # Start off with an announcement with as much info as possible volume_size, volume_avail = self._get_cache_volume_size() self._message(Message( None, MessageType.STATUS, "Starting cache cleanup", - detail=("Elements required by the current build plan: {}\n" + + detail=("Elements required by the current build plan:\n" + "{}\n" + "User specified quota: {} ({})\n" + "Cache usage: {}\n" + "Cache volume: {} total, {} available") - .format(len(required_refs), - context.config_cache_quota, - utils._pretty_size(self._cache_quota, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2), - utils._pretty_size(volume_size, dec_places=2), - utils._pretty_size(volume_avail, dec_places=2)))) + .format( + total_refs, + context.config_cache_quota, + utils._pretty_size(self._cache_quota, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + utils._pretty_size(volume_size, dec_places=2), + utils._pretty_size(volume_avail, dec_places=2)))) # Do a real computation of the cache size once, just in case self.compute_cache_size() @@ -1412,67 +1383,63 @@ class CASQuota: self._message(Message(None, MessageType.STATUS, "Cache usage recomputed: {}".format(usage))) - while self.get_cache_size() >= self._cache_lower_threshold: - try: - to_remove = refs.pop(0) - except IndexError: - # If too many artifacts are required, and we therefore - # can't remove them, we have to abort the build. - # - # FIXME: Asking the user what to do may be neater - # - default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], - 'buildstream.conf') - detail = ("Aborted after removing {} refs and saving {} disk space.\n" - "The remaining {} in the cache is required by the {} references in your build plan\n\n" - "There is not enough space to complete the build.\n" - "Please increase the cache-quota in {} and/or make more disk space." - .format(removed_ref_count, - utils._pretty_size(space_saved, dec_places=2), - utils._pretty_size(self.get_cache_size(), dec_places=2), - len(required_refs), - (context.config_origin or default_conf))) - - if self.full(): - raise CASCacheError("Cache too full. Aborting.", - detail=detail, - reason="cache-too-full") - else: - break - - key = to_remove.rpartition('/')[2] - if key not in required_refs: - - # Remove the actual artifact, if it's not required. - size = 0 - removed_ref = False - for (pred, remove) in self._remove_callbacks: - if pred(to_remove): - size = remove(to_remove) - removed_ref = True - break - - if not removed_ref: - continue + # Collect digests and their remove method + all_unrequired_refs = [] + for (unrequired_refs, remove) in self._remove_callbacks: + for (mtime, ref) in unrequired_refs(): + all_unrequired_refs.append((mtime, ref, remove)) - removed_ref_count += 1 - space_saved += size + # Pair refs and their remove method sorted in time order + all_unrequired_refs = [(ref, remove) for (_, ref, remove) in sorted(all_unrequired_refs)] - self._message(Message( - None, MessageType.STATUS, - "Freed {: <7} {}".format( - utils._pretty_size(size, dec_places=2), - to_remove))) + # Go through unrequired refs and remove them, oldest first + made_space = False + for (ref, remove) in all_unrequired_refs: + size = remove(ref) + removed_ref_count += 1 + space_saved += size - self.set_cache_size(self._cache_size - size) + self._message(Message( + None, MessageType.STATUS, + "Freed {: <7} {}".format( + utils._pretty_size(size, dec_places=2), + ref))) + + self.set_cache_size(self._cache_size - size) + + # User callback + # + # Currently this process is fairly slow, but we should + # think about throttling this progress() callback if this + # becomes too intense. + if progress: + progress() + + if self.get_cache_size() < self._cache_lower_threshold: + made_space = True + break - # User callback - # - # Currently this process is fairly slow, but we should - # think about throttling this progress() callback if this - # becomes too intense. - if progress: - progress() + if not made_space and self.full(): + # If too many artifacts are required, and we therefore + # can't remove them, we have to abort the build. + # + # FIXME: Asking the user what to do may be neater + # + default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], + 'buildstream.conf') + detail = ("Aborted after removing {} refs and saving {} disk space.\n" + "The remaining {} in the cache is required by the {} references in your build plan\n\n" + "There is not enough space to complete the build.\n" + "Please increase the cache-quota in {} and/or make more disk space." + .format(removed_ref_count, + utils._pretty_size(space_saved, dec_places=2), + utils._pretty_size(self.get_cache_size(), dec_places=2), + total_refs, + (context.config_origin or default_conf))) + + raise CASCacheError("Cache too full. Aborting.", + detail=detail, + reason="cache-too-full") # Informational message about the side effects of the cleanup self._message(Message( @@ -1485,22 +1452,6 @@ class CASQuota: return self.get_cache_size() - # add_ref_callbacks() - # - # Args: - # callback (Iterator): function that gives list of required refs - def add_ref_callbacks(self, callback): - self._ref_callbacks.append(callback) - - # add_remove_callbacks() - # - # Args: - # callback (predicate, callback): The predicate says whether this is the - # correct type to remove given a ref and the callback does actual - # removing. - def add_remove_callbacks(self, callback): - self._remove_callbacks.append(callback) - def _grouper(iterable, n): while True: diff --git a/buildstream/_cas/casserver.py b/buildstream/_cas/casserver.py index f88db717a..c08a4d577 100644 --- a/buildstream/_cas/casserver.py +++ b/buildstream/_cas/casserver.py @@ -428,11 +428,25 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): with open(artifact_path, 'rb') as f: artifact.ParseFromString(f.read()) - files_digest = artifact.files - # Now update mtimes of files present. try: - self.cas.update_tree_mtime(files_digest) + + if str(artifact.files): + self.cas.update_tree_mtime(artifact.files) + + if str(artifact.buildtree): + # buildtrees might not be there + try: + self.cas.update_tree_mtime(artifact.buildtree) + except FileNotFoundError: + pass + + if str(artifact.public_data): + os.utime(self.cas.objpath(artifact.public_data)) + + for log_file in artifact.logs: + os.utime(self.cas.objpath(log_file.digest)) + except FileNotFoundError: os.unlink(artifact_path) context.abort(grpc.StatusCode.NOT_FOUND, @@ -451,9 +465,6 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): # Unset protocol buffers don't evaluated to False but do return empty # strings, hence str() - if str(artifact.buildtree): - self._check_directory("buildtree", artifact.buildtree, context) - if str(artifact.public_data): self._check_file("public data", artifact.public_data, context) diff --git a/buildstream/_context.py b/buildstream/_context.py index fffeea17e..151ea636a 100644 --- a/buildstream/_context.py +++ b/buildstream/_context.py @@ -75,6 +75,9 @@ class Context(): # The directory for CAS self.casdir = None + # The directory for artifact protos + self.artifactdir = None + # The directory for temporary files self.tmpdir = None @@ -230,6 +233,7 @@ class Context(): self.tmpdir = os.path.join(self.cachedir, 'tmp') self.casdir = os.path.join(self.cachedir, 'cas') self.builddir = os.path.join(self.cachedir, 'build') + self.artifactdir = os.path.join(self.cachedir, 'artifacts', 'refs') # Move old artifact cas to cas if it exists and create symlink old_casdir = os.path.join(self.cachedir, 'artifacts', 'cas') diff --git a/buildstream/_sourcecache.py b/buildstream/_sourcecache.py index d00015128..1d3342a75 100644 --- a/buildstream/_sourcecache.py +++ b/buildstream/_sourcecache.py @@ -17,6 +17,8 @@ # Authors: # Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> # +import os + from ._cas import CASRemoteSpec from .storage._casbaseddirectory import CasBasedDirectory from ._basecache import BaseCache @@ -53,8 +55,8 @@ class SourceCache(BaseCache): self._required_sources = set() - self.casquota.add_ref_callbacks(self.required_sources) - self.casquota.add_remove_callbacks((lambda x: x.startswith('@sources/'), self.cas.remove)) + self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove) + self.casquota.add_list_refs_callback(self.list_sources) # mark_required_sources() # @@ -81,14 +83,43 @@ class SourceCache(BaseCache): # required_sources() # - # Yields the keys of all sources marked as required + # Yields the keys of all sources marked as required by the current build + # plan # # Returns: - # iterable (str): iterable over the source keys + # iterable (str): iterable over the required source refs # def required_sources(self): for source in self._required_sources: - yield source._key + yield source._get_source_name() + + # unrequired_sources() + # + # Yields the refs of all sources not required by the current build plan + # + # Returns: + # iter (str): iterable over unrequired source keys + # + def unrequired_sources(self): + required_source_names = set(map( + lambda x: x._get_source_name(), self._required_sources)) + for (mtime, source) in self._list_refs_mtimes( + os.path.join(self.cas.casdir, 'refs', 'heads'), + glob_expr="@sources/*"): + if source not in required_source_names: + yield (mtime, source) + + # list_sources() + # + # Get list of all sources in the `cas/refs/heads/@sources/` folder + # + # Returns: + # ([str]): iterable over all source refs + # + def list_sources(self): + return [ref for _, ref in self._list_refs_mtimes( + os.path.join(self.cas.casdir, 'refs', 'heads'), + glob_expr="@sources/*")] # contains() # @@ -159,7 +190,7 @@ class SourceCache(BaseCache): # # Returns: # (bool): True if pull successful, False if not - def pull(self, source, *, progress=None): + def pull(self, source): ref = source._get_source_name() project = source._get_project() @@ -170,7 +201,7 @@ class SourceCache(BaseCache): try: source.status("Pulling source {} <- {}".format(display_key, remote.spec.url)) - if self.cas.pull(ref, remote, progress=progress): + if self.cas.pull(ref, remote): source.info("Pulled source {} <- {}".format(display_key, remote.spec.url)) # no need to pull from additional remotes return True diff --git a/buildstream/_stream.py b/buildstream/_stream.py index d4f26e443..2343c553c 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -32,7 +32,7 @@ from contextlib import contextmanager, suppress from fnmatch import fnmatch from ._artifactelement import verify_artifact_ref -from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, CASCacheError +from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError from ._message import Message, MessageType from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue @@ -587,7 +587,7 @@ class Stream(): for ref in remove_refs: try: self._artifacts.remove(ref, defer_prune=True) - except CASCacheError as e: + except ArtifactError as e: self._message(MessageType.WARN, str(e)) continue diff --git a/buildstream/element.py b/buildstream/element.py index 7f68af262..ec69d85e9 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -198,7 +198,7 @@ class Element(Plugin): if not self.__is_junction: project.ensure_fully_loaded() - self.normal_name = os.path.splitext(self.name.replace(os.sep, '-'))[0] + self.normal_name = _get_normal_name(self.name) """A normalized element name This is the original element without path separators or @@ -620,15 +620,7 @@ class Element(Plugin): assert key is not None - valid_chars = string.digits + string.ascii_letters + '-._' - element_name = ''.join([ - x if x in valid_chars else '_' - for x in self.normal_name - ]) - - # Note that project names are not allowed to contain slashes. Element names containing - # a '/' will have this replaced with a '-' upon Element object instantiation. - return '{0}/{1}/{2}'.format(project.name, element_name, key) + return _compose_artifact_name(project.name, self.normal_name, key) def stage_artifact(self, sandbox, *, path=None, include=None, exclude=None, orphans=True, update_mtimes=None): """Stage this element's output artifact in the sandbox @@ -749,6 +741,7 @@ class Element(Plugin): # if self.__artifacts.contains(self, workspace.last_successful): last_successful = Artifact(self, context, strong_key=workspace.last_successful) + # Get a dict of dependency strong keys old_dep_keys = last_successful.get_metadata_dependencies() else: # Last successful build is no longer in the artifact cache, @@ -773,12 +766,12 @@ class Element(Plugin): if dep.name in old_dep_keys: key_new = dep._get_cache_key() - key_old = _yaml.node_get(old_dep_keys, str, dep.name) + key_old = old_dep_keys[dep.name] # We only need to worry about modified and added # files, since removed files will be picked up by # build systems anyway. - to_update, _, added = self.__artifacts.diff(dep, key_old, key_new, subdir='files') + to_update, _, added = self.__artifacts.diff(dep, key_old, key_new) workspace.add_running_files(dep.name, to_update + added) to_update.extend(workspace.running_files[dep.name]) @@ -1888,11 +1881,12 @@ class Element(Plugin): # Check whether the pull has been invoked with a specific subdir requested # in user context, as to complete a partial artifact - subdir, _ = self.__pull_directories() + pull_buildtrees = self._get_context().pull_buildtrees - if self.__strong_cached and subdir: + if self.__strong_cached and pull_buildtrees: # If we've specified a subdir, check if the subdir is cached locally - if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir): + # or if it's possible to get + if self._cached_buildtree() or not self._buildtree_exists(): return False elif self.__strong_cached: return False @@ -1925,18 +1919,15 @@ class Element(Plugin): def _pull(self): context = self._get_context() - def progress(percent, message): - self.status(message) - # Get optional specific subdir to pull and optional list to not pull # based off of user context - subdir, excluded_subdirs = self.__pull_directories() + pull_buildtrees = context.pull_buildtrees # Attempt to pull artifact without knowing whether it's available - pulled = self.__pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs) + pulled = self.__pull_strong(pull_buildtrees=pull_buildtrees) if not pulled and not self._cached() and not context.get_strict(): - pulled = self.__pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs) + pulled = self.__pull_weak(pull_buildtrees=pull_buildtrees) if not pulled: return False @@ -1998,8 +1989,8 @@ class Element(Plugin): self.warn("Not pushing tainted artifact.") return False - # Push all keys used for local commit - pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit()) + # Push all keys used for local commit via the Artifact member + pushed = self.__artifacts.push(self, self.__artifact) if not pushed: return False @@ -2861,17 +2852,6 @@ class Element(Plugin): self.__build_result = self.__artifact.load_build_result() - def __get_cache_keys_for_commit(self): - keys = [] - - # tag with strong cache key based on dependency versions used for the build - keys.append(self._get_cache_key(strength=_KeyStrength.STRONG)) - - # also store under weak cache key - keys.append(self._get_cache_key(strength=_KeyStrength.WEAK)) - - return utils._deduplicate(keys) - # __pull_strong(): # # Attempt pulling given element from configured artifact caches with @@ -2885,11 +2865,10 @@ class Element(Plugin): # Returns: # (bool): Whether or not the pull was successful # - def __pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None): + def __pull_strong(self, *, pull_buildtrees): weak_key = self._get_cache_key(strength=_KeyStrength.WEAK) key = self.__strict_cache_key - if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir, - excluded_subdirs=excluded_subdirs): + if not self.__artifacts.pull(self, key, pull_buildtrees=pull_buildtrees): return False # update weak ref by pointing it to this newly fetched artifact @@ -2903,17 +2882,16 @@ class Element(Plugin): # the weak cache key # # Args: - # progress (callable): The progress callback, if any # subdir (str): The optional specific subdir to pull # excluded_subdirs (list): The optional list of subdirs to not pull # # Returns: # (bool): Whether or not the pull was successful # - def __pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None): + def __pull_weak(self, *, pull_buildtrees): weak_key = self._get_cache_key(strength=_KeyStrength.WEAK) - if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir, - excluded_subdirs=excluded_subdirs): + if not self.__artifacts.pull(self, weak_key, + pull_buildtrees=pull_buildtrees): return False # extract strong cache key from this newly fetched artifact @@ -2925,37 +2903,6 @@ class Element(Plugin): return True - # __pull_directories(): - # - # Which directories to include or exclude given the current - # context - # - # Returns: - # subdir (str): The optional specific subdir to include, based - # on user context - # excluded_subdirs (list): The optional list of subdirs to not - # pull, referenced against subdir value - # - def __pull_directories(self): - context = self._get_context() - - # Current default exclusions on pull - excluded_subdirs = ["buildtree"] - subdir = '' - - # If buildtrees are to be pulled, remove the value from exclusion list - # and set specific subdir - if context.pull_buildtrees: - subdir = "buildtree" - excluded_subdirs.remove(subdir) - - # If file contents are not required for this element, don't pull them. - # The directories themselves will always be pulled. - if not context.require_artifact_files and not self._artifact_files_required(): - excluded_subdirs.append("files") - - return (subdir, excluded_subdirs) - # __cache_sources(): # # Caches the sources into the local CAS @@ -3017,3 +2964,42 @@ def _overlap_error_detail(f, forbidden_overlap_elements, elements): " above ".join(reversed(elements)))) else: return "" + + +# _get_normal_name(): +# +# Get the element name without path separators or +# the extension. +# +# Args: +# element_name (str): The element's name +# +# Returns: +# (str): The normalised element name +# +def _get_normal_name(element_name): + return os.path.splitext(element_name.replace(os.sep, '-'))[0] + + +# _compose_artifact_name(): +# +# Compose the completely resolved 'artifact_name' as a filepath +# +# Args: +# project_name (str): The project's name +# normal_name (str): The element's normalised name +# cache_key (str): The relevant cache key +# +# Returns: +# (str): The constructed artifact name path +# +def _compose_artifact_name(project_name, normal_name, cache_key): + valid_chars = string.digits + string.ascii_letters + '-._' + normal_name = ''.join([ + x if x in valid_chars else '_' + for x in normal_name + ]) + + # Note that project names are not allowed to contain slashes. Element names containing + # a '/' will have this replaced with a '-' upon Element object instantiation. + return '{0}/{1}/{2}'.format(project_name, normal_name, cache_key) diff --git a/buildstream/testing/runcli.py b/buildstream/testing/runcli.py index 934c31236..8b3185143 100644 --- a/buildstream/testing/runcli.py +++ b/buildstream/testing/runcli.py @@ -53,9 +53,11 @@ from _pytest.capture import MultiCapture, FDCapture, FDCaptureBinary from buildstream._frontend import cli as bst_cli from buildstream import _yaml from buildstream._cas import CASCache +from buildstream.element import _get_normal_name, _compose_artifact_name # Special private exception accessor, for test case purposes from buildstream._exceptions import BstError, get_last_exception, get_last_task_error +from buildstream._protos.buildstream.v2 import artifact_pb2 # Wrapper for the click.testing result @@ -495,6 +497,17 @@ class Cli(): result.assert_success() return result.output.splitlines() + # Fetch an element's complete artifact name, cache_key will be generated + # if not given. + # + def get_artifact_name(self, project, project_name, element_name, cache_key=None): + if not cache_key: + cache_key = self.get_element_key(project, element_name) + + # Replace path separator and chop off the .bst suffix for normal name + normal_name = _get_normal_name(element_name) + return _compose_artifact_name(project_name, normal_name, cache_key) + class CliIntegration(Cli): @@ -636,7 +649,8 @@ class TestArtifact(): # def remove_artifact_from_cache(self, cache_dir, element_name): - cache_dir = os.path.join(cache_dir, 'cas', 'refs', 'heads') + cache_dir = os.path.join(cache_dir, 'artifacts', 'refs') + normal_name = element_name.replace(os.sep, '-') cache_dir = os.path.splitext(os.path.join(cache_dir, 'test', normal_name))[0] shutil.rmtree(cache_dir) @@ -655,13 +669,13 @@ class TestArtifact(): # def is_cached(self, cache_dir, element, element_key): - cas = CASCache(str(cache_dir)) + # cas = CASCache(str(cache_dir)) artifact_ref = element.get_artifact_name(element_key) - return cas.contains(artifact_ref) + return os.path.exists(os.path.join(cache_dir, 'artifacts', 'refs', artifact_ref)) # get_digest(): # - # Get the digest for a given element's artifact + # Get the digest for a given element's artifact files # # Args: # cache_dir (str): Specific cache dir to check @@ -673,10 +687,12 @@ class TestArtifact(): # def get_digest(self, cache_dir, element, element_key): - cas = CASCache(str(cache_dir)) artifact_ref = element.get_artifact_name(element_key) - digest = cas.resolve_ref(artifact_ref) - return digest + artifact_dir = os.path.join(cache_dir, 'artifacts', 'refs') + artifact_proto = artifact_pb2.Artifact() + with open(os.path.join(artifact_dir, artifact_ref), 'rb') as f: + artifact_proto.ParseFromString(f.read()) + return artifact_proto.files # extract_buildtree(): # @@ -691,9 +707,19 @@ class TestArtifact(): # (str): path to extracted buildtree directory, does not guarantee # existence. @contextmanager - def extract_buildtree(self, tmpdir, digest): - with self._extract_subdirectory(tmpdir, digest, 'buildtree') as extract: - yield extract + def extract_buildtree(self, cache_dir, tmpdir, ref): + artifact = artifact_pb2.Artifact() + try: + with open(os.path.join(cache_dir, 'artifacts', 'refs', ref), 'rb') as f: + artifact.ParseFromString(f.read()) + except FileNotFoundError: + yield None + else: + if str(artifact.buildtree): + with self._extract_subdirectory(tmpdir, artifact.buildtree) as f: + yield f + else: + yield None # _extract_subdirectory(): # @@ -709,12 +735,12 @@ class TestArtifact(): # (str): path to extracted subdir directory, does not guarantee # existence. @contextmanager - def _extract_subdirectory(self, tmpdir, digest, subdir): + def _extract_subdirectory(self, tmpdir, digest): with tempfile.TemporaryDirectory() as extractdir: try: cas = CASCache(str(tmpdir)) cas.checkout(extractdir, digest) - yield os.path.join(extractdir, subdir) + yield extractdir except FileNotFoundError: yield None |