# # Copyright (C) 2018 Bloomberg LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Jim MacArthur """ CasBasedDirectory ========= Implementation of the Directory class which backs onto a Merkle-tree based content addressable storage system. See also: :ref:`sandboxing`. """ from collections import OrderedDict import os import tempfile import stat from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .._exceptions import BstError from .directory import Directory, VirtualDirectoryError from ._filebaseddirectory import FileBasedDirectory from ..utils import FileListResult, safe_copy, list_relative_paths, _relative_symlink_target from .._artifactcache.cascache import CASCache class IndexEntry(): """ Used in our index of names to objects to store the 'modified' flag for directory entries. Because we need both the remote_execution_pb2 object and our own Directory object for directory entries, we store both. For files and symlinks, only pb_object is used. """ def __init__(self, pb_object, buildstream_object=None, modified=False): self.pb_object = pb_object # Short for 'protocol buffer object') self.buildstream_object = buildstream_object self.modified = modified # CasBasedDirectory intentionally doesn't call its superclass constuctor, # which is meant to be unimplemented. # pylint: disable=super-init-not-called class CasBasedDirectory(Directory): """ CAS-based directories can have two names; one is a 'common name' which has no effect on functionality, and the 'filename'. If a CasBasedDirectory has a parent, then 'filename' must be the name of an entry in the parent directory's index which points to this object. This is used to inform a parent directory that it must update the given hash for this object when this object changes. Typically a top-level CasBasedDirectory will have a common_name and no filename, and subdirectories wil have a filename and no common_name. common_name can used to identify CasBasedDirectory objects in a log file, since they have no unique position in a file system. """ # Two constants which define the separators used by the remote execution API. _pb2_path_sep = "/" _pb2_absolute_path_prefix = "/" def __init__(self, context, ref=None, parent=None, common_name="untitled", filename=None): self.context = context self.cas_directory = os.path.join(context.artifactdir, 'cas') self.filename = filename self.common_name = common_name self.pb2_directory = remote_execution_pb2.Directory() self.cas_cache = CASCache(context) if ref: with open(self.cas_cache.objpath(ref), 'rb') as f: self.pb2_directory.ParseFromString(f.read()) self.ref = ref self.index = OrderedDict() self.parent = parent self._directory_read = False self._populate_index() def _populate_index(self): if self._directory_read: return for entry in self.pb2_directory.directories: buildStreamDirectory = CasBasedDirectory(self.context, ref=entry.digest, parent=self, filename=entry.name) self.index[entry.name] = IndexEntry(entry, buildstream_object=buildStreamDirectory) for entry in self.pb2_directory.files: self.index[entry.name] = IndexEntry(entry) for entry in self.pb2_directory.symlinks: self.index[entry.name] = IndexEntry(entry) self._directory_read = True def _recalculate_recursing_up(self, caller=None): """Recalcuate the hash for this directory and store the results in the cache. If this directory has a parent, tell it to recalculate (since changing this directory changes an entry in the parent). """ self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString()) if caller: old_dir = self._find_pb2_entry(caller.filename) self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString()) if self.parent: self.parent._recalculate_recursing_up(self) def _recalculate_recursing_down(self, parent=None): """Recalcuate the hash for this directory and any subdirectories. Hashes for subdirectories should be calculated and stored after a significant operation (e.g. an import_files() call) but not after adding each file, as that is extremely wasteful. """ for entry in self.pb2_directory.directories: self.index[entry.name].buildstream_object._recalculate_recursing_down(entry) if parent: self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString()) else: self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString()) # We don't need to do anything more than that; files were already added ealier, and symlinks are # part of the directory structure. def _find_pb2_entry(self, name): if name in self.index: return self.index[name].pb_object return None def _find_self_in_parent(self): assert self.parent is not None parent = self.parent for (k, v) in parent.index.items(): if v.buildstream_object == self: return k return None def _add_directory(self, name): if name in self.index: newdir = self.index[name].buildstream_object if not isinstance(newdir, CasBasedDirectory): # TODO: This may not be an actual error; it may actually overwrite it raise VirtualDirectoryError("New directory {} in {} would overwrite existing non-directory of type {}" .format(name, str(self), type(newdir))) dirnode = self._find_pb2_entry(name) else: newdir = CasBasedDirectory(self.context, parent=self, filename=name) dirnode = self.pb2_directory.directories.add() dirnode.name = name # Calculate the hash for an empty directory new_directory = remote_execution_pb2.Directory() self.cas_cache.add_object(digest=dirnode.digest, buffer=new_directory.SerializeToString()) self.index[name] = IndexEntry(dirnode, buildstream_object=newdir) return newdir def _add_new_file(self, basename, filename): filenode = self.pb2_directory.files.add() filenode.name = filename self.cas_cache.add_object(digest=filenode.digest, path=os.path.join(basename, filename)) is_executable = os.access(os.path.join(basename, filename), os.X_OK) filenode.is_executable = is_executable self.index[filename] = IndexEntry(filenode, modified=(filename in self.index)) def _add_new_link(self, basename, filename): existing_link = self._find_pb2_entry(filename) if existing_link: symlinknode = existing_link else: symlinknode = self.pb2_directory.symlinks.add() symlinknode.name = filename # A symlink node has no digest. symlinknode.target = os.readlink(os.path.join(basename, filename)) self.index[filename] = IndexEntry(symlinknode, modified=(existing_link is not None)) def delete_entry(self, name): for collection in [self.pb2_directory.files, self.pb2_directory.symlinks, self.pb2_directory.directories]: if name in collection: collection.remove(name) if name in self.index: del self.index[name] def descend(self, subdirectory_spec, create=False): """Descend one or more levels of directory hierarchy and return a new Directory object for that directory. Arguments: * subdirectory_spec (list of strings): A list of strings which are all directory names. * create (boolean): If this is true, the directories will be created if they don't already exist. Note: At the moment, creating a directory by descending does not update this object in the CAS cache. However, performing an import_files() into a subdirectory of any depth obtained by descending from this object *will* cause this directory to be updated and stored. """ # It's very common to send a directory name instead of a list and this causes # bizarre errors, so check for it here if not isinstance(subdirectory_spec, list): subdirectory_spec = [subdirectory_spec] # Because of the way split works, it's common to get a list which begins with # an empty string. Detect these and remove them. while subdirectory_spec and subdirectory_spec[0] == "": subdirectory_spec.pop(0) # Descending into [] returns the same directory. if not subdirectory_spec: return self if subdirectory_spec[0] in self.index: entry = self.index[subdirectory_spec[0]].buildstream_object if isinstance(entry, CasBasedDirectory): return entry.descend(subdirectory_spec[1:], create) else: error = "Cannot descend into {}, which is a '{}' in the directory {}" raise VirtualDirectoryError(error.format(subdirectory_spec[0], type(entry).__name__, self)) else: if create: newdir = self._add_directory(subdirectory_spec[0]) return newdir.descend(subdirectory_spec[1:], create) else: error = "No entry called '{}' found in {}. There are directories called {}." directory_list = ",".join([entry.name for entry in self.pb2_directory.directories]) raise VirtualDirectoryError(error.format(subdirectory_spec[0], str(self), directory_list)) return None def find_root(self): """ Finds the root of this directory tree by following 'parent' until there is no parent. """ if self.parent: return self.parent.find_root() else: return self def _resolve_symlink_or_directory(self, name): """Used only by _import_files_from_directory. Tries to resolve a directory name or symlink name. 'name' must be an entry in this directory. It must be a single symlink or directory name, not a path separated by path separators. If it's an existing directory name, it just returns the Directory object for that. If it's a symlink, it will attempt to find the target of the symlink and return that as a Directory object. If a symlink target doesn't exist, it will attempt to create it as a directory as long as it's within this directory tree. """ if isinstance(self.index[name].buildstream_object, Directory): return self.index[name].buildstream_object # OK then, it's a symlink symlink = self._find_pb2_entry(name) absolute = symlink.target.startswith(CasBasedDirectory._pb2_absolute_path_prefix) if absolute: root = self.find_root() else: root = self directory = root components = symlink.target.split(CasBasedDirectory._pb2_path_sep) for c in components: if c == "..": directory = directory.parent else: directory = directory.descend(c, create=True) return directory def _resolve(self, name, absolute_symlinks_resolve=True): """ Resolves any name to an object. If the name points to a symlink in this directory, it returns the thing it points to, recursively. Returns a CasBasedDirectory, FileNode or None. Never creates a directory or otherwise alters the directory. """ # First check if it's a normal object and return that if name not in self.index: return None index_entry = self.index[name] if isinstance(index_entry.buildstream_object, Directory): return index_entry.buildstream_object elif isinstance(index_entry.pb_object, remote_execution_pb2.FileNode): return index_entry.pb_object assert isinstance(index_entry.pb_object, remote_execution_pb2.SymlinkNode) symlink = index_entry.pb_object components = symlink.target.split(CasBasedDirectory._pb2_path_sep) absolute = symlink.target.startswith(CasBasedDirectory._pb2_absolute_path_prefix) if absolute: if absolute_symlinks_resolve: start_directory = self.find_root() # Discard the first empty element components.pop(0) else: print(" _resolve: Absolute symlink, which we won't resolve.") return None else: start_directory = self directory = start_directory print("Resolve {}: starting from {}".format(symlink.target, start_directory)) while True: if not components: # We ran out of path elements and ended up in a directory return directory c = components.pop(0) if c == "..": print(" resolving {}: up-dir".format(c)) # If directory.parent *is* None, this is an attempt to access # '..' from the root, which is valid under POSIX; it just # returns the root. if directory.parent is not None: directory = directory.parent else: if c in directory.index: f = directory._resolve(c, absolute_symlinks_resolve) # Ultimately f must now be a file or directory if isinstance(f, CasBasedDirectory): directory = f print(" resolving {}: dir".format(c)) else: # This is a file or None (i.e. broken symlink) print(" resolving {}: file/broken link".format(c)) if components: # Oh dear. We have components left to resolve, but the one we're trying to resolve points to a file. raise VirtualDirectoryError("Reached a file called {} while trying to resolve a symlink; cannot proceed".format(c)) else: return f else: print(" resolving {}: nonexistent!".format(c)) return None # Shouldn't get here. def _check_replacement(self, name, path_prefix, fileListResult): """ Checks whether 'name' exists, and if so, whether we can overwrite it. If we can, add the name to 'overwritten_files' and delete the existing entry. Returns 'True' if the import should go ahead. fileListResult.overwritten and fileListResult.ignore are updated depending on the result. """ existing_entry = self._find_pb2_entry(name) relative_pathname = os.path.join(path_prefix, name) if existing_entry is None: return True if (isinstance(existing_entry, (remote_execution_pb2.FileNode, remote_execution_pb2.SymlinkNode))): fileListResult.overwritten.append(relative_pathname) return True elif isinstance(existing_entry, remote_execution_pb2.DirectoryNode): # If 'name' maps to a DirectoryNode, then there must be an entry in index # pointing to another Directory. if self.index[name].buildstream_object.is_empty(): self.delete_entry(name) fileListResult.overwritten.append(relative_pathname) return True else: # We can't overwrite a non-empty directory, so we just ignore it. fileListResult.ignored.append(relative_pathname) return False assert False, ("Entry '{}' is not a recognised file/link/directory and not None; it is {}" .format(name, type(existing_entry))) return False # In case asserts are disabled def _import_directory_recursively(self, directory_name, source_directory, remaining_path, path_prefix): """ _import_directory_recursively and _import_files_from_directory will be called alternately as a directory tree is descended. """ if directory_name in self.index: subdir = self._resolve_symlink_or_directory(directory_name) else: subdir = self._add_directory(directory_name) new_path_prefix = os.path.join(path_prefix, directory_name) subdir_result = subdir._import_files_from_directory(os.path.join(source_directory, directory_name), [os.path.sep.join(remaining_path)], path_prefix=new_path_prefix) return subdir_result def _import_files_from_directory(self, source_directory, files, path_prefix=""): """ Imports files from a traditional directory """ result = FileListResult() for entry in sorted(files): split_path = entry.split(os.path.sep) # The actual file on the FS we're importing import_file = os.path.join(source_directory, entry) # The destination filename, relative to the root where the import started relative_pathname = os.path.join(path_prefix, entry) if len(split_path) > 1: directory_name = split_path[0] # Hand this off to the importer for that subdir. This will only do one file - # a better way would be to hand off all the files in this subdir at once. subdir_result = self._import_directory_recursively(directory_name, source_directory, split_path[1:], path_prefix) result.combine(subdir_result) elif os.path.islink(import_file): if self._check_replacement(entry, path_prefix, result): self._add_new_link(source_directory, entry) result.files_written.append(relative_pathname) elif os.path.isdir(import_file): # A plain directory which already exists isn't a problem; just ignore it. if entry not in self.index: self._add_directory(entry) elif os.path.isfile(import_file): if self._check_replacement(entry, path_prefix, result): self._add_new_file(source_directory, entry) result.files_written.append(relative_pathname) return result def import_files(self, external_pathspec, *, files=None, report_written=True, update_utimes=False, can_link=False): """Imports some or all files from external_path into this directory. Keyword arguments: external_pathspec: Either a string containing a pathname, or a Directory object, to use as the source. files (list of strings): A list of all the files relative to the external_pathspec to copy. If 'None' is supplied, all files are copied. report_written (bool): Return the full list of files written. Defaults to true. If false, only a list of overwritten files is returned. update_utimes (bool): Currently ignored, since CAS does not store utimes. can_link (bool): Ignored, since hard links do not have any meaning within CAS. """ if isinstance(external_pathspec, FileBasedDirectory): source_directory = external_pathspec._get_underlying_directory() elif isinstance(external_pathspec, CasBasedDirectory): # TODO: This transfers from one CAS to another via the # filesystem, which is very inefficient. Alter this so it # transfers refs across directly. with tempfile.TemporaryDirectory(prefix="roundtrip") as tmpdir: external_pathspec.export_files(tmpdir) if files is None: files = list_relative_paths(tmpdir) result = self._import_files_from_directory(tmpdir, files=files) return result else: source_directory = external_pathspec if files is None: files = list_relative_paths(source_directory) # TODO: No notice is taken of report_written, update_utimes or can_link. # Current behaviour is to fully populate the report, which is inefficient, # but still correct. result = self._import_files_from_directory(source_directory, files=files) # We need to recalculate and store the hashes of all directories both # up and down the tree; we have changed our directory by importing files # which changes our hash and all our parents' hashes of us. The trees # lower down need to be stored in the CAS as they are not automatically # added during construction. self._recalculate_recursing_down() if self.parent: self.parent._recalculate_recursing_up(self) return result def set_deterministic_mtime(self): """ Sets a static modification time for all regular files in this directory. Since we don't store any modification time, we don't need to do anything. """ pass def set_deterministic_user(self): """ Sets all files in this directory to the current user's euid/egid. We also don't store user data, so this can be ignored. """ pass def export_files(self, to_directory, *, can_link=False, can_destroy=False): """Copies everything from this into to_directory, which must be the name of a traditional filesystem directory. Arguments: to_directory (string): a path outside this directory object where the contents will be copied to. can_link (bool): Whether we can create hard links in to_directory instead of copying. can_destroy (bool): Whether we can destroy elements in this directory to export them (e.g. by renaming them as the target). """ if not os.path.exists(to_directory): os.mkdir(to_directory) for entry in self.pb2_directory.directories: if entry.name not in self.index: raise VirtualDirectoryError("CasDir {} contained {} in directories but not in the index" .format(str(self), entry.name)) if not self._directory_read: raise VirtualDirectoryError("CasDir {} has not been indexed yet".format(str(self))) dest_dir = os.path.join(to_directory, entry.name) if not os.path.exists(dest_dir): os.mkdir(dest_dir) target = self.descend([entry.name]) target.export_files(dest_dir) for entry in self.pb2_directory.files: # Extract the entry to a single file dest_name = os.path.join(to_directory, entry.name) src_name = self.cas_cache.objpath(entry.digest) safe_copy(src_name, dest_name) if entry.is_executable: os.chmod(dest_name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) for entry in self.pb2_directory.symlinks: src_name = os.path.join(to_directory, entry.name) target_name = entry.target try: os.symlink(target_name, src_name) except FileExistsError as e: raise BstError(("Cannot create a symlink named {} pointing to {}." + " The original error was: {}"). format(src_name, entry.target, e)) def export_to_tar(self, tarfile, destination_dir, mtime=0): raise NotImplementedError() def mark_changed(self): """ It should not be possible to externally modify a CAS-based directory at the moment.""" raise NotImplementedError() def is_empty(self): """ Return true if this directory has no files, subdirectories or links in it. """ return len(self.index) == 0 def _mark_directory_unmodified(self): # Marks all entries in this directory and all child directories as unmodified. for i in self.index.values(): i.modified = False if isinstance(i.buildstream_object, CasBasedDirectory): i.buildstream_object._mark_directory_unmodified() def _mark_entry_unmodified(self, name): # Marks an entry as unmodified. If the entry is a directory, it will # recursively mark all its tree as unmodified. self.index[name].modified = False if self.index[name].buildstream_object: self.index[name].buildstream_object._mark_directory_unmodified() def mark_unmodified(self): """ Marks all files in this directory (recursively) as unmodified. If we have a parent, we mark our own entry as unmodified in that parent's index. """ if self.parent: self.parent._mark_entry_unmodified(self._find_self_in_parent()) else: self._mark_directory_unmodified() def list_modified_paths(self): """Provide a list of relative paths which have been modified since the last call to mark_unmodified. Return value: List(str) - list of modified paths """ filelist = [] for (k, v) in self.index.items(): if isinstance(v.buildstream_object, CasBasedDirectory): filelist.extend([k + os.path.sep + x for x in v.buildstream_object.list_modified_paths()]) elif isinstance(v.pb_object, remote_execution_pb2.FileNode) and v.modified: filelist.append(k) return filelist def _contains_only_directories(self): for (k, v) in self.index.items(): if not isinstance(v.buildstream_object, CasBasedDirectory): return False return True def list_relative_paths(self, relpath=""): """Provide a list of all relative paths. NOTE: This list is not in the same order as utils.list_relative_paths. Return value: List(str) - list of all paths """ print("Running list_relative_paths on relpath {}".format(relpath)) symlink_list = list(filter(lambda i: isinstance(i[1].pb_object, remote_execution_pb2.SymlinkNode), self.index.items())) file_list = list(filter(lambda i: isinstance(i[1].pb_object, remote_execution_pb2.FileNode), self.index.items())) directory_list = list(filter(lambda i: isinstance(i[1].buildstream_object, CasBasedDirectory), self.index.items())) symlinks_to_directories_list = [] print("Running list_relative_paths on relpath {}. files={}, symlinks={}".format(relpath, [f[0] for f in file_list], [s[0] for s in symlink_list])) for (k, v) in sorted(symlink_list): target = self._resolve(k, absolute_symlinks_resolve=True) if isinstance(target, CasBasedDirectory): symlinks_to_directories_list.append(k) else: # Broken symlinks are also considered files! file_list.append((k,v)) for d in sorted(symlinks_to_directories_list): yield os.path.join(relpath, d) if file_list == [] and relpath != "": print("Yielding empty directory name {}".format(relpath)) yield relpath else: for (k, v) in sorted(file_list): print("Yielding file {}".format(k)) yield os.path.join(relpath, k) for (k, v) in sorted(directory_list): print("Yielding from subdirectory name {}".format(k)) yield from v.buildstream_object.list_relative_paths(relpath=os.path.join(relpath, k)) def recalculate_hash(self): """ Recalcuates the hash for this directory and store the results in the cache. If this directory has a parent, tell it to recalculate (since changing this directory changes an entry in the parent). Hashes for subdirectories also get recalculated. """ self._recalculate_recursing_up() self._recalculate_recursing_down() def _get_identifier(self): path = "" if self.parent: path = self.parent._get_identifier() if self.filename: path += "/" + self.filename else: path += "/" + self.common_name return path def __str__(self): return "[CAS:{}]".format(self._get_identifier()) def _get_underlying_directory(self): """ There is no underlying directory for a CAS-backed directory, so throw an exception. """ raise VirtualDirectoryError("_get_underlying_directory was called on a CAS-backed directory," + " which has no underlying directory.")