diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-03-01 05:56:37 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-03-01 05:56:37 +0000 |
commit | fc2b42d63ac4ec1bc7727a1f7705613305060572 (patch) | |
tree | 7909066a6023a34be825db63184b52b1a6b1dd17 | |
parent | 348f04e7a80338de1b9cc911e48263d8cd31be65 (diff) | |
parent | f3a63faf7ea88ed95831aa26fe7e9cfa7960d7e3 (diff) | |
download | buildstream-fc2b42d63ac4ec1bc7727a1f7705613305060572.tar.gz |
Merge branch 'juerg/lazy-directory-digest' into 'master'
_casbaseddirectory.py: Calculate directory digest lazily
See merge request BuildStream/buildstream!1188
-rw-r--r-- | buildstream/_artifactcache.py | 7 | ||||
-rw-r--r-- | buildstream/_cas/cascache.py | 3 | ||||
-rw-r--r-- | buildstream/element.py | 2 | ||||
-rw-r--r-- | buildstream/sandbox/_sandboxremote.py | 10 | ||||
-rw-r--r-- | buildstream/sandbox/sandbox.py | 2 | ||||
-rw-r--r-- | buildstream/storage/_casbaseddirectory.py | 225 | ||||
-rw-r--r-- | buildstream/storage/_filebaseddirectory.py | 7 | ||||
-rw-r--r-- | tests/artifactcache/push.py | 5 | ||||
-rw-r--r-- | tests/internals/storage_vdir_import.py | 7 |
9 files changed, 117 insertions, 151 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py index b317296ec..9986d689f 100644 --- a/buildstream/_artifactcache.py +++ b/buildstream/_artifactcache.py @@ -441,7 +441,7 @@ class ArtifactCache(): def get_artifact_directory(self, element, key): ref = element.get_artifact_name(key) digest = self.cas.resolve_ref(ref, update_mtime=True) - return CasBasedDirectory(self.cas, digest) + return CasBasedDirectory(self.cas, digest=digest) # commit(): # @@ -636,9 +636,6 @@ class ArtifactCache(): raise ArtifactError("push_directory was called, but no remote artifact " + "servers are configured as push remotes.") - if directory.ref is None: - return - for remote in push_remotes: self.cas.push_directory(remote, directory) @@ -697,7 +694,7 @@ class ArtifactCache(): def get_artifact_logs(self, ref): descend = ["logs"] cache_id = self.cas.resolve_ref(ref, update_mtime=True) - vdir = CasBasedDirectory(self.cas, cache_id).descend(descend) + vdir = CasBasedDirectory(self.cas, digest=cache_id).descend(descend) return vdir ################################################ diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py index 08860bacc..ffc9c2b93 100644 --- a/buildstream/_cas/cascache.py +++ b/buildstream/_cas/cascache.py @@ -360,7 +360,8 @@ class CASCache(): def push_directory(self, remote, directory): remote.init() - self._send_directory(remote, directory.ref) + digest = directory._get_digest() + self._send_directory(remote, digest) # objpath(): # diff --git a/buildstream/element.py b/buildstream/element.py index fff95d0f5..9d1333721 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1691,7 +1691,7 @@ class Element(Plugin): context = self._get_context() - assemblevdir = CasBasedDirectory(cas_cache=context.artifactcache.cas, ref=None) + assemblevdir = CasBasedDirectory(cas_cache=context.artifactcache.cas) logsvdir = assemblevdir.descend("logs", create=True) metavdir = assemblevdir.descend("meta", create=True) buildtreevdir = assemblevdir.descend("buildtree", create=True) diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py index e97b37abd..81702743a 100644 --- a/buildstream/sandbox/_sandboxremote.py +++ b/buildstream/sandbox/_sandboxremote.py @@ -299,7 +299,7 @@ class SandboxRemote(Sandbox): # to replace the sandbox's virtual directory with that. Creating a new virtual directory object # from another hash will be interesting, though... - new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest) + new_dir = CasBasedDirectory(context.artifactcache.cas, digest=dir_digest) self._set_virtual_directory(new_dir) def _run(self, command, flags, *, cwd, env): @@ -308,13 +308,11 @@ class SandboxRemote(Sandbox): cascache = self._get_context().get_cascache() if isinstance(upload_vdir, FileBasedDirectory): # Make a new temporary directory to put source in - upload_vdir = CasBasedDirectory(cascache, ref=None) + upload_vdir = CasBasedDirectory(cascache) upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory()) - upload_vdir.recalculate_hash() - # Generate action_digest first - input_root_digest = upload_vdir.ref + input_root_digest = upload_vdir._get_digest() command_proto = self._create_command(command, cwd, env) command_digest = utils._message_digest(command_proto.SerializeToString()) action = remote_execution_pb2.Action(command_digest=command_digest, @@ -346,7 +344,7 @@ class SandboxRemote(Sandbox): except grpc.RpcError as e: raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e - if not casremote.verify_digest_on_remote(upload_vdir.ref): + if not casremote.verify_digest_on_remote(upload_vdir._get_digest()): raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.") # Push command and action diff --git a/buildstream/sandbox/sandbox.py b/buildstream/sandbox/sandbox.py index 2159c0fef..ca4652599 100644 --- a/buildstream/sandbox/sandbox.py +++ b/buildstream/sandbox/sandbox.py @@ -191,7 +191,7 @@ class Sandbox(): """ if self._vdir is None or self._never_cache_vdirs: if 'BST_CAS_DIRECTORIES' in os.environ: - self._vdir = CasBasedDirectory(self.__context.artifactcache.cas, ref=None) + self._vdir = CasBasedDirectory(self.__context.artifactcache.cas) else: self._vdir = FileBasedDirectory(self._root) return self._vdir diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py index bd72e7c1c..43497eaf7 100644 --- a/buildstream/storage/_casbaseddirectory.py +++ b/buildstream/storage/_casbaseddirectory.py @@ -36,20 +36,22 @@ from ..utils import FileListResult, _magic_timestamp class IndexEntry(): - """ Used in our index of names to objects to store the 'modified' flag - for directory entries. Because we need both the remote_execution_pb2 object - and our own Directory object for directory entries, we store both. For files - and symlinks, only pb_object is used. """ - def __init__(self, pb_object, entrytype, buildstream_object=None, modified=False): - self.pb_object = pb_object # Short for 'protocol buffer object') + """ Directory entry used in CasBasedDirectory.index """ + def __init__(self, name, entrytype, *, digest=None, target=None, is_executable=False, + buildstream_object=None, modified=False): + self.name = name self.type = entrytype + self.digest = digest + self.target = target + self.is_executable = is_executable self.buildstream_object = buildstream_object self.modified = modified def get_directory(self, parent): if not self.buildstream_object: - self.buildstream_object = CasBasedDirectory(parent.cas_cache, ref=self.pb_object.digest, - parent=parent, filename=self.pb_object.name) + self.buildstream_object = CasBasedDirectory(parent.cas_cache, digest=self.digest, + parent=parent, filename=self.name) + self.digest = None return self.buildstream_object @@ -105,70 +107,31 @@ class CasBasedDirectory(Directory): _pb2_path_sep = "/" _pb2_absolute_path_prefix = "/" - def __init__(self, cas_cache, ref=None, parent=None, common_name="untitled", filename=None): + def __init__(self, cas_cache, *, digest=None, parent=None, common_name="untitled", filename=None): self.filename = filename self.common_name = common_name - self.pb2_directory = remote_execution_pb2.Directory() self.cas_cache = cas_cache - if ref: - with open(self.cas_cache.objpath(ref), 'rb') as f: - self.pb2_directory.ParseFromString(f.read()) - - self.ref = ref + self.__digest = digest self.index = {} self.parent = parent - self._directory_read = False - self._populate_index() - - def _populate_index(self): - if self._directory_read: - return - for entry in self.pb2_directory.directories: - self.index[entry.name] = IndexEntry(entry, _FileType.DIRECTORY) - for entry in self.pb2_directory.files: - self.index[entry.name] = IndexEntry(entry, _FileType.REGULAR_FILE) - for entry in self.pb2_directory.symlinks: - self.index[entry.name] = IndexEntry(entry, _FileType.SYMLINK) - self._directory_read = True - - def _recalculate_recursing_up(self, caller=None): - """Recalcuate the hash for this directory and store the results in - the cache. If this directory has a parent, tell it to - recalculate (since changing this directory changes an entry in - the parent). - - """ - if caller: - old_dir = self._find_pb2_entry(caller.filename) - self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString()) - self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString()) - if self.parent: - self.parent._recalculate_recursing_up(self) - - def _recalculate_recursing_down(self, parent=None): - """Recalcuate the hash for this directory and any - subdirectories. Hashes for subdirectories should be calculated - and stored after a significant operation (e.g. an - import_files() call) but not after adding each file, as that - is extremely wasteful. - - """ - for entry in self.pb2_directory.directories: - subdir = self.index[entry.name].buildstream_object - if subdir: - subdir._recalculate_recursing_down(entry) - - if parent: - self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString()) - else: - self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString()) - # We don't need to do anything more than that; files were already added ealier, and symlinks are - # part of the directory structure. - - def _find_pb2_entry(self, name): - if name in self.index: - return self.index[name].pb_object - return None + if digest: + self._populate_index(digest) + + def _populate_index(self, digest): + pb2_directory = remote_execution_pb2.Directory() + with open(self.cas_cache.objpath(digest), 'rb') as f: + pb2_directory.ParseFromString(f.read()) + + for entry in pb2_directory.directories: + self.index[entry.name] = IndexEntry(entry.name, _FileType.DIRECTORY, + digest=entry.digest) + for entry in pb2_directory.files: + self.index[entry.name] = IndexEntry(entry.name, _FileType.REGULAR_FILE, + digest=entry.digest, + is_executable=entry.is_executable) + for entry in pb2_directory.symlinks: + self.index[entry.name] = IndexEntry(entry.name, _FileType.SYMLINK, + target=entry.target) def _find_self_in_parent(self): assert self.parent is not None @@ -182,46 +145,36 @@ class CasBasedDirectory(Directory): assert name not in self.index newdir = CasBasedDirectory(self.cas_cache, parent=self, filename=name) - dirnode = self.pb2_directory.directories.add() - dirnode.name = name - # Calculate the hash for an empty directory - new_directory = remote_execution_pb2.Directory() - self.cas_cache.add_object(digest=dirnode.digest, buffer=new_directory.SerializeToString()) - self.index[name] = IndexEntry(dirnode, _FileType.DIRECTORY, buildstream_object=newdir) + self.index[name] = IndexEntry(name, _FileType.DIRECTORY, buildstream_object=newdir) + + self.__invalidate_digest() + return newdir def _add_file(self, basename, filename, modified=False): - filenode = self.pb2_directory.files.add() - filenode.name = filename - self.cas_cache.add_object(digest=filenode.digest, path=os.path.join(basename, filename)) - is_executable = os.access(os.path.join(basename, filename), os.X_OK) - filenode.is_executable = is_executable - self.index[filename] = IndexEntry(filenode, _FileType.REGULAR_FILE, - modified=modified or filename in self.index) + entry = IndexEntry(filename, _FileType.REGULAR_FILE, + modified=modified or filename in self.index) + entry.digest = self.cas_cache.add_object(path=os.path.join(basename, filename)) + entry.is_executable = os.access(os.path.join(basename, filename), os.X_OK) + self.index[filename] = entry + + self.__invalidate_digest() def _copy_link_from_filesystem(self, basename, filename): self._add_new_link_direct(filename, os.readlink(os.path.join(basename, filename))) def _add_new_link_direct(self, name, target): - entry = self.index.get(name) - if entry: - symlinknode = entry.pb_object - else: - symlinknode = self.pb2_directory.symlinks.add() - symlinknode.name = name - # A symlink node has no digest. - symlinknode.target = target - self.index[name] = IndexEntry(symlinknode, _FileType.SYMLINK, modified=(entry is not None)) + self.index[name] = IndexEntry(name, _FileType.SYMLINK, target=target, modified=name in self.index) + + self.__invalidate_digest() def delete_entry(self, name): - for collection in [self.pb2_directory.files, self.pb2_directory.symlinks, self.pb2_directory.directories]: - for thing in collection: - if thing.name == name: - collection.remove(thing) if name in self.index: del self.index[name] + self.__invalidate_digest() + def descend(self, subdirectory_spec, create=False): """Descend one or more levels of directory hierarchy and return a new Directory object for that directory. @@ -262,17 +215,15 @@ class CasBasedDirectory(Directory): else: error = "Cannot descend into {}, which is a '{}' in the directory {}" raise VirtualDirectoryError(error.format(subdirectory_spec[0], - type(self.index[subdirectory_spec[0]].pb_object).__name__, + self.index[subdirectory_spec[0]].type, self)) else: if create: newdir = self._add_directory(subdirectory_spec[0]) return newdir.descend(subdirectory_spec[1:], create) else: - error = "No entry called '{}' found in {}. There are directories called {}." - directory_list = ",".join([entry.name for entry in self.pb2_directory.directories]) - raise VirtualDirectoryError(error.format(subdirectory_spec[0], str(self), - directory_list)) + error = "'{}' not found in {}" + raise VirtualDirectoryError(error.format(subdirectory_spec[0], str(self))) return None def _check_replacement(self, name, path_prefix, fileListResult): @@ -305,7 +256,7 @@ class CasBasedDirectory(Directory): def _import_files_from_directory(self, source_directory, filter_callback, *, path_prefix="", result): """ Import files from a traditional directory. """ - for direntry in sorted(os.scandir(source_directory), key=lambda e: e.name): + for direntry in os.scandir(source_directory): # The destination filename, relative to the root where the import started relative_pathname = os.path.join(path_prefix, direntry.name) @@ -345,7 +296,7 @@ class CasBasedDirectory(Directory): def _partial_import_cas_into_cas(self, source_directory, filter_callback, *, path_prefix="", result): """ Import files from a CAS-based directory. """ - for name, entry in sorted(source_directory.index.items()): + for name, entry in source_directory.index.items(): # The destination filename, relative to the root where the import started relative_pathname = os.path.join(path_prefix, name) @@ -375,14 +326,15 @@ class CasBasedDirectory(Directory): if not is_dir: if self._check_replacement(name, path_prefix, result): - item = entry.pb_object if entry.type == _FileType.REGULAR_FILE: - filenode = self.pb2_directory.files.add(digest=item.digest, name=name, - is_executable=item.is_executable) - self.index[name] = IndexEntry(filenode, _FileType.REGULAR_FILE, modified=True) + self.index[name] = IndexEntry(name, _FileType.REGULAR_FILE, + digest=entry.digest, + is_executable=entry.is_executable, + modified=True) + self.__invalidate_digest() else: assert entry.type == _FileType.SYMLINK - self._add_new_link_direct(name=name, target=item.target) + self._add_new_link_direct(name=name, target=entry.target) result.files_written.append(relative_pathname) def import_files(self, external_pathspec, *, @@ -407,14 +359,6 @@ class CasBasedDirectory(Directory): # Current behaviour is to fully populate the report, which is inefficient, # but still correct. - # We need to recalculate and store the hashes of all directories both - # up and down the tree; we have changed our directory by importing files - # which changes our hash and all our parents' hashes of us. The trees - # lower down need to be stored in the CAS as they are not automatically - # added during construction. - self._recalculate_recursing_down() - if self.parent: - self.parent._recalculate_recursing_up(self) return result def set_deterministic_mtime(self): @@ -539,23 +483,15 @@ class CasBasedDirectory(Directory): subdir = v.get_directory(self) yield from subdir.list_relative_paths(relpath=os.path.join(relpath, k)) - def recalculate_hash(self): - """ Recalcuates the hash for this directory and store the results in - the cache. If this directory has a parent, tell it to - recalculate (since changing this directory changes an entry in - the parent). Hashes for subdirectories also get recalculated. - """ - self._recalculate_recursing_up() - self._recalculate_recursing_down() - def get_size(self): - total = len(self.pb2_directory.SerializeToString()) + digest = self._get_digest() + total = digest.size_bytes for i in self.index.values(): if i.type == _FileType.DIRECTORY: subdir = i.get_directory(self) total += subdir.get_size() elif i.type == _FileType.REGULAR_FILE: - src_name = self.cas_cache.objpath(i.pb_object.digest) + src_name = self.cas_cache.objpath(i.digest) filesize = os.stat(src_name).st_size total += filesize # Symlink nodes are encoded as part of the directory serialization. @@ -588,14 +524,41 @@ class CasBasedDirectory(Directory): # (Digest): The Digest protobuf object for the Directory protobuf # def _get_digest(self): - if not self.ref: - self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString()) - return self.ref + if not self.__digest: + # Create updated Directory proto + pb2_directory = remote_execution_pb2.Directory() + + for name, entry in sorted(self.index.items()): + if entry.type == _FileType.DIRECTORY: + dirnode = pb2_directory.directories.add() + dirnode.name = name + + # Update digests for subdirectories in DirectoryNodes. + # No need to call entry.get_directory(). + # If it hasn't been instantiated, digest must be up-to-date. + subdir = entry.buildstream_object + if subdir: + dirnode.digest.CopyFrom(subdir._get_digest()) + else: + dirnode.digest.CopyFrom(entry.digest) + elif entry.type == _FileType.REGULAR_FILE: + filenode = pb2_directory.files.add() + filenode.name = name + filenode.digest.CopyFrom(entry.digest) + filenode.is_executable = entry.is_executable + elif entry.type == _FileType.SYMLINK: + symlinknode = pb2_directory.symlinks.add() + symlinknode.name = name + symlinknode.target = entry.target + + self.__digest = self.cas_cache.add_object(buffer=pb2_directory.SerializeToString()) + + return self.__digest def _objpath(self, path): subdir = self.descend(path[:-1]) entry = subdir.index[path[-1]] - return self.cas_cache.objpath(entry.pb_object.digest) + return self.cas_cache.objpath(entry.digest) def _exists(self, path): try: @@ -603,3 +566,9 @@ class CasBasedDirectory(Directory): return path[-1] in subdir.index except VirtualDirectoryError: return False + + def __invalidate_digest(self): + if self.__digest: + self.__digest = None + if self.parent: + self.parent.__invalidate_digest() diff --git a/buildstream/storage/_filebaseddirectory.py b/buildstream/storage/_filebaseddirectory.py index 61827f19c..4b0fd917b 100644 --- a/buildstream/storage/_filebaseddirectory.py +++ b/buildstream/storage/_filebaseddirectory.py @@ -264,14 +264,13 @@ class FileBasedDirectory(Directory): result.ignored.append(relative_pathname) continue - item = entry.pb_object if entry.type == _FileType.REGULAR_FILE: - src_path = source_directory.cas_cache.objpath(item.digest) + src_path = source_directory.cas_cache.objpath(entry.digest) actionfunc(src_path, dest_path, result=result) - if item.is_executable: + if entry.is_executable: os.chmod(dest_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) else: assert entry.type == _FileType.SYMLINK - os.symlink(item.target, dest_path) + os.symlink(entry.target, dest_path) result.files_written.append(relative_pathname) diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 0a39f5344..a7b8bdbd9 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -233,12 +233,13 @@ def _test_push_directory(user_config_file, project_dir, artifact_digest, queue): if cas.has_push_remotes(): # Create a CasBasedDirectory from local CAS cache content - directory = CasBasedDirectory(context.artifactcache.cas, ref=artifact_digest) + directory = CasBasedDirectory(context.artifactcache.cas, digest=artifact_digest) # Push the CasBasedDirectory object cas.push_directory(project, directory) - queue.put(directory.ref.hash) + digest = directory._get_digest() + queue.put(digest.hash) else: queue.put("No remote configured") diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py index ee346ea58..268bfb21f 100644 --- a/tests/internals/storage_vdir_import.py +++ b/tests/internals/storage_vdir_import.py @@ -113,7 +113,8 @@ def file_contents_are(path, contents): def create_new_casdir(root_number, cas_cache, tmpdir): d = CasBasedDirectory(cas_cache) d.import_files(os.path.join(tmpdir, "content", "root{}".format(root_number))) - assert d.ref.hash != empty_hash_ref + digest = d._get_digest() + assert digest.hash != empty_hash_ref return d @@ -175,7 +176,7 @@ def _import_test(tmpdir, original, overlay, generator_function, verify_contents= duplicate_cas = create_new_casdir(original, cas_cache, tmpdir) - assert duplicate_cas.ref.hash == d.ref.hash + assert duplicate_cas._get_digest().hash == d._get_digest().hash d2 = create_new_casdir(overlay, cas_cache, tmpdir) d.import_files(d2) @@ -213,7 +214,7 @@ def _import_test(tmpdir, original, overlay, generator_function, verify_contents= duplicate_cas.import_files(roundtrip_dir) - assert duplicate_cas.ref.hash == d.ref.hash + assert duplicate_cas._get_digest().hash == d._get_digest().hash # It's possible to parameterize on both original and overlay values, |