summaryrefslogtreecommitdiff
path: root/src/buildstream/storage/_casbaseddirectory.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/storage/_casbaseddirectory.py')
-rw-r--r--src/buildstream/storage/_casbaseddirectory.py622
1 files changed, 622 insertions, 0 deletions
diff --git a/src/buildstream/storage/_casbaseddirectory.py b/src/buildstream/storage/_casbaseddirectory.py
new file mode 100644
index 000000000..2aff29b98
--- /dev/null
+++ b/src/buildstream/storage/_casbaseddirectory.py
@@ -0,0 +1,622 @@
+#
+# Copyright (C) 2018 Bloomberg LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Jim MacArthur <jim.macarthur@codethink.co.uk>
+
+"""
+CasBasedDirectory
+=========
+
+Implementation of the Directory class which backs onto a Merkle-tree based content
+addressable storage system.
+
+See also: :ref:`sandboxing`.
+"""
+
+import os
+
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from .directory import Directory, VirtualDirectoryError, _FileType
+from ._filebaseddirectory import FileBasedDirectory
+from ..utils import FileListResult, _magic_timestamp
+
+
+class IndexEntry():
+ """ Directory entry used in CasBasedDirectory.index """
+ def __init__(self, name, entrytype, *, digest=None, target=None, is_executable=False,
+ buildstream_object=None, modified=False):
+ self.name = name
+ self.type = entrytype
+ self.digest = digest
+ self.target = target
+ self.is_executable = is_executable
+ self.buildstream_object = buildstream_object
+ self.modified = modified
+
+ def get_directory(self, parent):
+ if not self.buildstream_object:
+ self.buildstream_object = CasBasedDirectory(parent.cas_cache, digest=self.digest,
+ parent=parent, filename=self.name)
+ self.digest = None
+
+ return self.buildstream_object
+
+ def get_digest(self):
+ if self.digest:
+ return self.digest
+ else:
+ return self.buildstream_object._get_digest()
+
+
+class ResolutionException(VirtualDirectoryError):
+ """ Superclass of all exceptions that can be raised by
+ CasBasedDirectory._resolve. Should not be used outside this module. """
+
+
+class InfiniteSymlinkException(ResolutionException):
+ """ Raised when an infinite symlink loop is found. """
+
+
+class AbsoluteSymlinkException(ResolutionException):
+ """Raised if we try to follow an absolute symlink (i.e. one whose
+ target starts with the path separator) and we have disallowed
+ following such symlinks.
+ """
+
+
+class UnexpectedFileException(ResolutionException):
+ """Raised if we were found a file where a directory or symlink was
+ expected, for example we try to resolve a symlink pointing to
+ /a/b/c but /a/b is a file.
+ """
+ def __init__(self, message=""):
+ """Allow constructor with no arguments, since this can be raised in
+ places where there isn't sufficient information to write the
+ message.
+ """
+ super().__init__(message)
+
+
+# CasBasedDirectory intentionally doesn't call its superclass constuctor,
+# which is meant to be unimplemented.
+# pylint: disable=super-init-not-called
+
+class CasBasedDirectory(Directory):
+ """
+ CAS-based directories can have two names; one is a 'common name' which has no effect
+ on functionality, and the 'filename'. If a CasBasedDirectory has a parent, then 'filename'
+ must be the name of an entry in the parent directory's index which points to this object.
+ This is used to inform a parent directory that it must update the given hash for this
+ object when this object changes.
+
+ Typically a top-level CasBasedDirectory will have a common_name and no filename, and
+ subdirectories wil have a filename and no common_name. common_name can used to identify
+ CasBasedDirectory objects in a log file, since they have no unique position in a file
+ system.
+ """
+
+ # Two constants which define the separators used by the remote execution API.
+ _pb2_path_sep = "/"
+ _pb2_absolute_path_prefix = "/"
+
+ def __init__(self, cas_cache, *, digest=None, parent=None, common_name="untitled", filename=None):
+ self.filename = filename
+ self.common_name = common_name
+ self.cas_cache = cas_cache
+ self.__digest = digest
+ self.index = {}
+ self.parent = parent
+ if digest:
+ self._populate_index(digest)
+
+ def _populate_index(self, digest):
+ try:
+ pb2_directory = remote_execution_pb2.Directory()
+ with open(self.cas_cache.objpath(digest), 'rb') as f:
+ pb2_directory.ParseFromString(f.read())
+ except FileNotFoundError as e:
+ raise VirtualDirectoryError("Directory not found in local cache: {}".format(e)) from e
+
+ for entry in pb2_directory.directories:
+ self.index[entry.name] = IndexEntry(entry.name, _FileType.DIRECTORY,
+ digest=entry.digest)
+ for entry in pb2_directory.files:
+ self.index[entry.name] = IndexEntry(entry.name, _FileType.REGULAR_FILE,
+ digest=entry.digest,
+ is_executable=entry.is_executable)
+ for entry in pb2_directory.symlinks:
+ self.index[entry.name] = IndexEntry(entry.name, _FileType.SYMLINK,
+ target=entry.target)
+
+ def _find_self_in_parent(self):
+ assert self.parent is not None
+ parent = self.parent
+ for (k, v) in parent.index.items():
+ if v.buildstream_object == self:
+ return k
+ return None
+
+ def _add_directory(self, name):
+ assert name not in self.index
+
+ newdir = CasBasedDirectory(self.cas_cache, parent=self, filename=name)
+
+ self.index[name] = IndexEntry(name, _FileType.DIRECTORY, buildstream_object=newdir)
+
+ self.__invalidate_digest()
+
+ return newdir
+
+ def _add_file(self, basename, filename, modified=False):
+ entry = IndexEntry(filename, _FileType.REGULAR_FILE,
+ modified=modified or filename in self.index)
+ path = os.path.join(basename, filename)
+ entry.digest = self.cas_cache.add_object(path=path)
+ entry.is_executable = os.access(path, os.X_OK)
+ self.index[filename] = entry
+
+ self.__invalidate_digest()
+
+ def _copy_link_from_filesystem(self, basename, filename):
+ self._add_new_link_direct(filename, os.readlink(os.path.join(basename, filename)))
+
+ def _add_new_link_direct(self, name, target):
+ self.index[name] = IndexEntry(name, _FileType.SYMLINK, target=target, modified=name in self.index)
+
+ self.__invalidate_digest()
+
+ def delete_entry(self, name):
+ if name in self.index:
+ del self.index[name]
+
+ self.__invalidate_digest()
+
+ def descend(self, *paths, create=False):
+ """Descend one or more levels of directory hierarchy and return a new
+ Directory object for that directory.
+
+ Arguments:
+ * *paths (str): A list of strings which are all directory names.
+ * create (boolean): If this is true, the directories will be created if
+ they don't already exist.
+
+ Note: At the moment, creating a directory by descending does
+ not update this object in the CAS cache. However, performing
+ an import_files() into a subdirectory of any depth obtained by
+ descending from this object *will* cause this directory to be
+ updated and stored.
+
+ """
+
+ current_dir = self
+
+ for path in paths:
+ # Skip empty path segments
+ if not path:
+ continue
+
+ entry = current_dir.index.get(path)
+ if entry:
+ if entry.type == _FileType.DIRECTORY:
+ current_dir = entry.get_directory(current_dir)
+ else:
+ error = "Cannot descend into {}, which is a '{}' in the directory {}"
+ raise VirtualDirectoryError(error.format(path,
+ current_dir.index[path].type,
+ current_dir))
+ else:
+ if create:
+ current_dir = current_dir._add_directory(path)
+ else:
+ error = "'{}' not found in {}"
+ raise VirtualDirectoryError(error.format(path, str(current_dir)))
+
+ return current_dir
+
+ def _check_replacement(self, name, relative_pathname, fileListResult):
+ """ Checks whether 'name' exists, and if so, whether we can overwrite it.
+ If we can, add the name to 'overwritten_files' and delete the existing entry.
+ Returns 'True' if the import should go ahead.
+ fileListResult.overwritten and fileListResult.ignore are updated depending
+ on the result. """
+ existing_entry = self.index.get(name)
+ if existing_entry is None:
+ return True
+ elif existing_entry.type == _FileType.DIRECTORY:
+ # If 'name' maps to a DirectoryNode, then there must be an entry in index
+ # pointing to another Directory.
+ subdir = existing_entry.get_directory(self)
+ if subdir.is_empty():
+ self.delete_entry(name)
+ fileListResult.overwritten.append(relative_pathname)
+ return True
+ else:
+ # We can't overwrite a non-empty directory, so we just ignore it.
+ fileListResult.ignored.append(relative_pathname)
+ return False
+ else:
+ self.delete_entry(name)
+ fileListResult.overwritten.append(relative_pathname)
+ return True
+
+ def _import_files_from_directory(self, source_directory, filter_callback, *, path_prefix="", result):
+ """ Import files from a traditional directory. """
+
+ for direntry in os.scandir(source_directory):
+ # The destination filename, relative to the root where the import started
+ relative_pathname = os.path.join(path_prefix, direntry.name)
+
+ is_dir = direntry.is_dir(follow_symlinks=False)
+
+ if is_dir:
+ src_subdir = os.path.join(source_directory, direntry.name)
+
+ try:
+ create_subdir = direntry.name not in self.index
+ dest_subdir = self.descend(direntry.name, create=create_subdir)
+ except VirtualDirectoryError:
+ filetype = self.index[direntry.name].type
+ raise VirtualDirectoryError('Destination is a {}, not a directory: /{}'
+ .format(filetype, relative_pathname))
+
+ dest_subdir._import_files_from_directory(src_subdir, filter_callback,
+ path_prefix=relative_pathname, result=result)
+
+ if filter_callback and not filter_callback(relative_pathname):
+ if is_dir and create_subdir and dest_subdir.is_empty():
+ # Complete subdirectory has been filtered out, remove it
+ self.delete_entry(direntry.name)
+
+ # Entry filtered out, move to next
+ continue
+
+ if direntry.is_file(follow_symlinks=False):
+ if self._check_replacement(direntry.name, relative_pathname, result):
+ self._add_file(source_directory, direntry.name, modified=relative_pathname in result.overwritten)
+ result.files_written.append(relative_pathname)
+ elif direntry.is_symlink():
+ if self._check_replacement(direntry.name, relative_pathname, result):
+ self._copy_link_from_filesystem(source_directory, direntry.name)
+ result.files_written.append(relative_pathname)
+
+ def _partial_import_cas_into_cas(self, source_directory, filter_callback, *, path_prefix="", result):
+ """ Import files from a CAS-based directory. """
+
+ for name, entry in source_directory.index.items():
+ # The destination filename, relative to the root where the import started
+ relative_pathname = os.path.join(path_prefix, name)
+
+ is_dir = entry.type == _FileType.DIRECTORY
+
+ if is_dir:
+ create_subdir = name not in self.index
+
+ if create_subdir and not filter_callback:
+ # If subdirectory does not exist yet and there is no filter,
+ # we can import the whole source directory by digest instead
+ # of importing each directory entry individually.
+ subdir_digest = entry.get_digest()
+ dest_entry = IndexEntry(name, _FileType.DIRECTORY, digest=subdir_digest)
+ self.index[name] = dest_entry
+ self.__invalidate_digest()
+
+ # However, we still need to iterate over the directory entries
+ # to fill in `result.files_written`.
+
+ # Use source subdirectory object if it already exists,
+ # otherwise create object for destination subdirectory.
+ # This is based on the assumption that the destination
+ # subdirectory is more likely to be modified later on
+ # (e.g., by further import_files() calls).
+ if entry.buildstream_object:
+ subdir = entry.buildstream_object
+ else:
+ subdir = dest_entry.get_directory(self)
+
+ subdir.__add_files_to_result(path_prefix=relative_pathname, result=result)
+ else:
+ src_subdir = source_directory.descend(name)
+
+ try:
+ dest_subdir = self.descend(name, create=create_subdir)
+ except VirtualDirectoryError:
+ filetype = self.index[name].type
+ raise VirtualDirectoryError('Destination is a {}, not a directory: /{}'
+ .format(filetype, relative_pathname))
+
+ dest_subdir._partial_import_cas_into_cas(src_subdir, filter_callback,
+ path_prefix=relative_pathname, result=result)
+
+ if filter_callback and not filter_callback(relative_pathname):
+ if is_dir and create_subdir and dest_subdir.is_empty():
+ # Complete subdirectory has been filtered out, remove it
+ self.delete_entry(name)
+
+ # Entry filtered out, move to next
+ continue
+
+ if not is_dir:
+ if self._check_replacement(name, relative_pathname, result):
+ if entry.type == _FileType.REGULAR_FILE:
+ self.index[name] = IndexEntry(name, _FileType.REGULAR_FILE,
+ digest=entry.digest,
+ is_executable=entry.is_executable,
+ modified=True)
+ self.__invalidate_digest()
+ else:
+ assert entry.type == _FileType.SYMLINK
+ self._add_new_link_direct(name=name, target=entry.target)
+ result.files_written.append(relative_pathname)
+
+ def import_files(self, external_pathspec, *,
+ filter_callback=None,
+ report_written=True, update_mtime=False,
+ can_link=False):
+ """ See superclass Directory for arguments """
+
+ result = FileListResult()
+
+ if isinstance(external_pathspec, FileBasedDirectory):
+ source_directory = external_pathspec._get_underlying_directory()
+ self._import_files_from_directory(source_directory, filter_callback, result=result)
+ elif isinstance(external_pathspec, str):
+ source_directory = external_pathspec
+ self._import_files_from_directory(source_directory, filter_callback, result=result)
+ else:
+ assert isinstance(external_pathspec, CasBasedDirectory)
+ self._partial_import_cas_into_cas(external_pathspec, filter_callback, result=result)
+
+ # TODO: No notice is taken of report_written, update_mtime or can_link.
+ # Current behaviour is to fully populate the report, which is inefficient,
+ # but still correct.
+
+ return result
+
+ def set_deterministic_mtime(self):
+ """ Sets a static modification time for all regular files in this directory.
+ Since we don't store any modification time, we don't need to do anything.
+ """
+
+ def set_deterministic_user(self):
+ """ Sets all files in this directory to the current user's euid/egid.
+ We also don't store user data, so this can be ignored.
+ """
+
+ def export_files(self, to_directory, *, can_link=False, can_destroy=False):
+ """Copies everything from this into to_directory, which must be the name
+ of a traditional filesystem directory.
+
+ Arguments:
+
+ to_directory (string): a path outside this directory object
+ where the contents will be copied to.
+
+ can_link (bool): Whether we can create hard links in to_directory
+ instead of copying.
+
+ can_destroy (bool): Whether we can destroy elements in this
+ directory to export them (e.g. by renaming them as the
+ target).
+
+ """
+
+ self.cas_cache.checkout(to_directory, self._get_digest(), can_link=can_link)
+
+ def export_to_tar(self, tarfile, destination_dir, mtime=_magic_timestamp):
+ raise NotImplementedError()
+
+ def mark_changed(self):
+ """ It should not be possible to externally modify a CAS-based
+ directory at the moment."""
+ raise NotImplementedError()
+
+ def is_empty(self):
+ """ Return true if this directory has no files, subdirectories or links in it.
+ """
+ return len(self.index) == 0
+
+ def _mark_directory_unmodified(self):
+ # Marks all entries in this directory and all child directories as unmodified.
+ for i in self.index.values():
+ i.modified = False
+ if i.type == _FileType.DIRECTORY and i.buildstream_object:
+ i.buildstream_object._mark_directory_unmodified()
+
+ def _mark_entry_unmodified(self, name):
+ # Marks an entry as unmodified. If the entry is a directory, it will
+ # recursively mark all its tree as unmodified.
+ self.index[name].modified = False
+ if self.index[name].buildstream_object:
+ self.index[name].buildstream_object._mark_directory_unmodified()
+
+ def mark_unmodified(self):
+ """ Marks all files in this directory (recursively) as unmodified.
+ If we have a parent, we mark our own entry as unmodified in that parent's
+ index.
+ """
+ if self.parent:
+ self.parent._mark_entry_unmodified(self._find_self_in_parent())
+ else:
+ self._mark_directory_unmodified()
+
+ def _lightweight_resolve_to_index(self, path):
+ """A lightweight function for transforming paths into IndexEntry
+ objects. This does not follow symlinks.
+
+ path: The string to resolve. This should be a series of path
+ components separated by the protocol buffer path separator
+ _pb2_path_sep.
+
+ Returns: the IndexEntry found, or None if any of the path components were not present.
+
+ """
+ directory = self
+ path_components = path.split(CasBasedDirectory._pb2_path_sep)
+ for component in path_components[:-1]:
+ if component not in directory.index:
+ return None
+ if directory.index[component].type == _FileType.DIRECTORY:
+ directory = directory.index[component].get_directory(self)
+ else:
+ return None
+ return directory.index.get(path_components[-1], None)
+
+ def list_modified_paths(self):
+ """Provide a list of relative paths which have been modified since the
+ last call to mark_unmodified.
+
+ Return value: List(str) - list of modified paths
+ """
+
+ for p in self.list_relative_paths():
+ i = self._lightweight_resolve_to_index(p)
+ if i and i.modified:
+ yield p
+
+ def list_relative_paths(self, relpath=""):
+ """Provide a list of all relative paths.
+
+ Return value: List(str) - list of all paths
+ """
+
+ file_list = list(filter(lambda i: i[1].type != _FileType.DIRECTORY,
+ self.index.items()))
+ directory_list = filter(lambda i: i[1].type == _FileType.DIRECTORY,
+ self.index.items())
+
+ if relpath != "":
+ yield relpath
+
+ for (k, v) in sorted(file_list):
+ yield os.path.join(relpath, k)
+
+ for (k, v) in sorted(directory_list):
+ subdir = v.get_directory(self)
+ yield from subdir.list_relative_paths(relpath=os.path.join(relpath, k))
+
+ def get_size(self):
+ digest = self._get_digest()
+ total = digest.size_bytes
+ for i in self.index.values():
+ if i.type == _FileType.DIRECTORY:
+ subdir = i.get_directory(self)
+ total += subdir.get_size()
+ elif i.type == _FileType.REGULAR_FILE:
+ total += i.digest.size_bytes
+ # Symlink nodes are encoded as part of the directory serialization.
+ return total
+
+ def _get_identifier(self):
+ path = ""
+ if self.parent:
+ path = self.parent._get_identifier()
+ if self.filename:
+ path += "/" + self.filename
+ else:
+ path += "/" + self.common_name
+ return path
+
+ def __str__(self):
+ return "[CAS:{}]".format(self._get_identifier())
+
+ def _get_underlying_directory(self):
+ """ There is no underlying directory for a CAS-backed directory, so
+ throw an exception. """
+ raise VirtualDirectoryError("_get_underlying_directory was called on a CAS-backed directory," +
+ " which has no underlying directory.")
+
+ # _get_digest():
+ #
+ # Return the Digest for this directory.
+ #
+ # Returns:
+ # (Digest): The Digest protobuf object for the Directory protobuf
+ #
+ def _get_digest(self):
+ if not self.__digest:
+ # Create updated Directory proto
+ pb2_directory = remote_execution_pb2.Directory()
+
+ for name, entry in sorted(self.index.items()):
+ if entry.type == _FileType.DIRECTORY:
+ dirnode = pb2_directory.directories.add()
+ dirnode.name = name
+
+ # Update digests for subdirectories in DirectoryNodes.
+ # No need to call entry.get_directory().
+ # If it hasn't been instantiated, digest must be up-to-date.
+ subdir = entry.buildstream_object
+ if subdir:
+ dirnode.digest.CopyFrom(subdir._get_digest())
+ else:
+ dirnode.digest.CopyFrom(entry.digest)
+ elif entry.type == _FileType.REGULAR_FILE:
+ filenode = pb2_directory.files.add()
+ filenode.name = name
+ filenode.digest.CopyFrom(entry.digest)
+ filenode.is_executable = entry.is_executable
+ elif entry.type == _FileType.SYMLINK:
+ symlinknode = pb2_directory.symlinks.add()
+ symlinknode.name = name
+ symlinknode.target = entry.target
+
+ self.__digest = self.cas_cache.add_object(buffer=pb2_directory.SerializeToString())
+
+ return self.__digest
+
+ def _get_child_digest(self, *path):
+ subdir = self.descend(*path[:-1])
+ entry = subdir.index[path[-1]]
+ if entry.type == _FileType.DIRECTORY:
+ subdir = entry.buildstream_object
+ if subdir:
+ return subdir._get_digest()
+ else:
+ return entry.digest
+ elif entry.type == _FileType.REGULAR_FILE:
+ return entry.digest
+ else:
+ raise VirtualDirectoryError("Directory entry has no digest: {}".format(os.path.join(*path)))
+
+ def _objpath(self, *path):
+ subdir = self.descend(*path[:-1])
+ entry = subdir.index[path[-1]]
+ return self.cas_cache.objpath(entry.digest)
+
+ def _exists(self, *path):
+ try:
+ subdir = self.descend(*path[:-1])
+ return path[-1] in subdir.index
+ except VirtualDirectoryError:
+ return False
+
+ def __invalidate_digest(self):
+ if self.__digest:
+ self.__digest = None
+ if self.parent:
+ self.parent.__invalidate_digest()
+
+ def __add_files_to_result(self, *, path_prefix="", result):
+ for name, entry in self.index.items():
+ # The destination filename, relative to the root where the import started
+ relative_pathname = os.path.join(path_prefix, name)
+
+ if entry.type == _FileType.DIRECTORY:
+ subdir = self.descend(name)
+ subdir.__add_files_to_result(path_prefix=relative_pathname, result=result)
+ else:
+ result.files_written.append(relative_pathname)