summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-03-01 05:56:37 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-03-01 05:56:37 +0000
commitfc2b42d63ac4ec1bc7727a1f7705613305060572 (patch)
tree7909066a6023a34be825db63184b52b1a6b1dd17
parent348f04e7a80338de1b9cc911e48263d8cd31be65 (diff)
parentf3a63faf7ea88ed95831aa26fe7e9cfa7960d7e3 (diff)
downloadbuildstream-fc2b42d63ac4ec1bc7727a1f7705613305060572.tar.gz
Merge branch 'juerg/lazy-directory-digest' into 'master'
_casbaseddirectory.py: Calculate directory digest lazily See merge request BuildStream/buildstream!1188
-rw-r--r--buildstream/_artifactcache.py7
-rw-r--r--buildstream/_cas/cascache.py3
-rw-r--r--buildstream/element.py2
-rw-r--r--buildstream/sandbox/_sandboxremote.py10
-rw-r--r--buildstream/sandbox/sandbox.py2
-rw-r--r--buildstream/storage/_casbaseddirectory.py225
-rw-r--r--buildstream/storage/_filebaseddirectory.py7
-rw-r--r--tests/artifactcache/push.py5
-rw-r--r--tests/internals/storage_vdir_import.py7
9 files changed, 117 insertions, 151 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py
index b317296ec..9986d689f 100644
--- a/buildstream/_artifactcache.py
+++ b/buildstream/_artifactcache.py
@@ -441,7 +441,7 @@ class ArtifactCache():
def get_artifact_directory(self, element, key):
ref = element.get_artifact_name(key)
digest = self.cas.resolve_ref(ref, update_mtime=True)
- return CasBasedDirectory(self.cas, digest)
+ return CasBasedDirectory(self.cas, digest=digest)
# commit():
#
@@ -636,9 +636,6 @@ class ArtifactCache():
raise ArtifactError("push_directory was called, but no remote artifact " +
"servers are configured as push remotes.")
- if directory.ref is None:
- return
-
for remote in push_remotes:
self.cas.push_directory(remote, directory)
@@ -697,7 +694,7 @@ class ArtifactCache():
def get_artifact_logs(self, ref):
descend = ["logs"]
cache_id = self.cas.resolve_ref(ref, update_mtime=True)
- vdir = CasBasedDirectory(self.cas, cache_id).descend(descend)
+ vdir = CasBasedDirectory(self.cas, digest=cache_id).descend(descend)
return vdir
################################################
diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py
index 08860bacc..ffc9c2b93 100644
--- a/buildstream/_cas/cascache.py
+++ b/buildstream/_cas/cascache.py
@@ -360,7 +360,8 @@ class CASCache():
def push_directory(self, remote, directory):
remote.init()
- self._send_directory(remote, directory.ref)
+ digest = directory._get_digest()
+ self._send_directory(remote, digest)
# objpath():
#
diff --git a/buildstream/element.py b/buildstream/element.py
index fff95d0f5..9d1333721 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1691,7 +1691,7 @@ class Element(Plugin):
context = self._get_context()
- assemblevdir = CasBasedDirectory(cas_cache=context.artifactcache.cas, ref=None)
+ assemblevdir = CasBasedDirectory(cas_cache=context.artifactcache.cas)
logsvdir = assemblevdir.descend("logs", create=True)
metavdir = assemblevdir.descend("meta", create=True)
buildtreevdir = assemblevdir.descend("buildtree", create=True)
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index e97b37abd..81702743a 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -299,7 +299,7 @@ class SandboxRemote(Sandbox):
# to replace the sandbox's virtual directory with that. Creating a new virtual directory object
# from another hash will be interesting, though...
- new_dir = CasBasedDirectory(context.artifactcache.cas, ref=dir_digest)
+ new_dir = CasBasedDirectory(context.artifactcache.cas, digest=dir_digest)
self._set_virtual_directory(new_dir)
def _run(self, command, flags, *, cwd, env):
@@ -308,13 +308,11 @@ class SandboxRemote(Sandbox):
cascache = self._get_context().get_cascache()
if isinstance(upload_vdir, FileBasedDirectory):
# Make a new temporary directory to put source in
- upload_vdir = CasBasedDirectory(cascache, ref=None)
+ upload_vdir = CasBasedDirectory(cascache)
upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())
- upload_vdir.recalculate_hash()
-
# Generate action_digest first
- input_root_digest = upload_vdir.ref
+ input_root_digest = upload_vdir._get_digest()
command_proto = self._create_command(command, cwd, env)
command_digest = utils._message_digest(command_proto.SerializeToString())
action = remote_execution_pb2.Action(command_digest=command_digest,
@@ -346,7 +344,7 @@ class SandboxRemote(Sandbox):
except grpc.RpcError as e:
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e
- if not casremote.verify_digest_on_remote(upload_vdir.ref):
+ if not casremote.verify_digest_on_remote(upload_vdir._get_digest()):
raise SandboxError("Failed to verify that source has been pushed to the remote artifact cache.")
# Push command and action
diff --git a/buildstream/sandbox/sandbox.py b/buildstream/sandbox/sandbox.py
index 2159c0fef..ca4652599 100644
--- a/buildstream/sandbox/sandbox.py
+++ b/buildstream/sandbox/sandbox.py
@@ -191,7 +191,7 @@ class Sandbox():
"""
if self._vdir is None or self._never_cache_vdirs:
if 'BST_CAS_DIRECTORIES' in os.environ:
- self._vdir = CasBasedDirectory(self.__context.artifactcache.cas, ref=None)
+ self._vdir = CasBasedDirectory(self.__context.artifactcache.cas)
else:
self._vdir = FileBasedDirectory(self._root)
return self._vdir
diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py
index bd72e7c1c..43497eaf7 100644
--- a/buildstream/storage/_casbaseddirectory.py
+++ b/buildstream/storage/_casbaseddirectory.py
@@ -36,20 +36,22 @@ from ..utils import FileListResult, _magic_timestamp
class IndexEntry():
- """ Used in our index of names to objects to store the 'modified' flag
- for directory entries. Because we need both the remote_execution_pb2 object
- and our own Directory object for directory entries, we store both. For files
- and symlinks, only pb_object is used. """
- def __init__(self, pb_object, entrytype, buildstream_object=None, modified=False):
- self.pb_object = pb_object # Short for 'protocol buffer object')
+ """ Directory entry used in CasBasedDirectory.index """
+ def __init__(self, name, entrytype, *, digest=None, target=None, is_executable=False,
+ buildstream_object=None, modified=False):
+ self.name = name
self.type = entrytype
+ self.digest = digest
+ self.target = target
+ self.is_executable = is_executable
self.buildstream_object = buildstream_object
self.modified = modified
def get_directory(self, parent):
if not self.buildstream_object:
- self.buildstream_object = CasBasedDirectory(parent.cas_cache, ref=self.pb_object.digest,
- parent=parent, filename=self.pb_object.name)
+ self.buildstream_object = CasBasedDirectory(parent.cas_cache, digest=self.digest,
+ parent=parent, filename=self.name)
+ self.digest = None
return self.buildstream_object
@@ -105,70 +107,31 @@ class CasBasedDirectory(Directory):
_pb2_path_sep = "/"
_pb2_absolute_path_prefix = "/"
- def __init__(self, cas_cache, ref=None, parent=None, common_name="untitled", filename=None):
+ def __init__(self, cas_cache, *, digest=None, parent=None, common_name="untitled", filename=None):
self.filename = filename
self.common_name = common_name
- self.pb2_directory = remote_execution_pb2.Directory()
self.cas_cache = cas_cache
- if ref:
- with open(self.cas_cache.objpath(ref), 'rb') as f:
- self.pb2_directory.ParseFromString(f.read())
-
- self.ref = ref
+ self.__digest = digest
self.index = {}
self.parent = parent
- self._directory_read = False
- self._populate_index()
-
- def _populate_index(self):
- if self._directory_read:
- return
- for entry in self.pb2_directory.directories:
- self.index[entry.name] = IndexEntry(entry, _FileType.DIRECTORY)
- for entry in self.pb2_directory.files:
- self.index[entry.name] = IndexEntry(entry, _FileType.REGULAR_FILE)
- for entry in self.pb2_directory.symlinks:
- self.index[entry.name] = IndexEntry(entry, _FileType.SYMLINK)
- self._directory_read = True
-
- def _recalculate_recursing_up(self, caller=None):
- """Recalcuate the hash for this directory and store the results in
- the cache. If this directory has a parent, tell it to
- recalculate (since changing this directory changes an entry in
- the parent).
-
- """
- if caller:
- old_dir = self._find_pb2_entry(caller.filename)
- self.cas_cache.add_object(digest=old_dir.digest, buffer=caller.pb2_directory.SerializeToString())
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
- if self.parent:
- self.parent._recalculate_recursing_up(self)
-
- def _recalculate_recursing_down(self, parent=None):
- """Recalcuate the hash for this directory and any
- subdirectories. Hashes for subdirectories should be calculated
- and stored after a significant operation (e.g. an
- import_files() call) but not after adding each file, as that
- is extremely wasteful.
-
- """
- for entry in self.pb2_directory.directories:
- subdir = self.index[entry.name].buildstream_object
- if subdir:
- subdir._recalculate_recursing_down(entry)
-
- if parent:
- self.ref = self.cas_cache.add_object(digest=parent.digest, buffer=self.pb2_directory.SerializeToString())
- else:
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
- # We don't need to do anything more than that; files were already added ealier, and symlinks are
- # part of the directory structure.
-
- def _find_pb2_entry(self, name):
- if name in self.index:
- return self.index[name].pb_object
- return None
+ if digest:
+ self._populate_index(digest)
+
+ def _populate_index(self, digest):
+ pb2_directory = remote_execution_pb2.Directory()
+ with open(self.cas_cache.objpath(digest), 'rb') as f:
+ pb2_directory.ParseFromString(f.read())
+
+ for entry in pb2_directory.directories:
+ self.index[entry.name] = IndexEntry(entry.name, _FileType.DIRECTORY,
+ digest=entry.digest)
+ for entry in pb2_directory.files:
+ self.index[entry.name] = IndexEntry(entry.name, _FileType.REGULAR_FILE,
+ digest=entry.digest,
+ is_executable=entry.is_executable)
+ for entry in pb2_directory.symlinks:
+ self.index[entry.name] = IndexEntry(entry.name, _FileType.SYMLINK,
+ target=entry.target)
def _find_self_in_parent(self):
assert self.parent is not None
@@ -182,46 +145,36 @@ class CasBasedDirectory(Directory):
assert name not in self.index
newdir = CasBasedDirectory(self.cas_cache, parent=self, filename=name)
- dirnode = self.pb2_directory.directories.add()
- dirnode.name = name
- # Calculate the hash for an empty directory
- new_directory = remote_execution_pb2.Directory()
- self.cas_cache.add_object(digest=dirnode.digest, buffer=new_directory.SerializeToString())
- self.index[name] = IndexEntry(dirnode, _FileType.DIRECTORY, buildstream_object=newdir)
+ self.index[name] = IndexEntry(name, _FileType.DIRECTORY, buildstream_object=newdir)
+
+ self.__invalidate_digest()
+
return newdir
def _add_file(self, basename, filename, modified=False):
- filenode = self.pb2_directory.files.add()
- filenode.name = filename
- self.cas_cache.add_object(digest=filenode.digest, path=os.path.join(basename, filename))
- is_executable = os.access(os.path.join(basename, filename), os.X_OK)
- filenode.is_executable = is_executable
- self.index[filename] = IndexEntry(filenode, _FileType.REGULAR_FILE,
- modified=modified or filename in self.index)
+ entry = IndexEntry(filename, _FileType.REGULAR_FILE,
+ modified=modified or filename in self.index)
+ entry.digest = self.cas_cache.add_object(path=os.path.join(basename, filename))
+ entry.is_executable = os.access(os.path.join(basename, filename), os.X_OK)
+ self.index[filename] = entry
+
+ self.__invalidate_digest()
def _copy_link_from_filesystem(self, basename, filename):
self._add_new_link_direct(filename, os.readlink(os.path.join(basename, filename)))
def _add_new_link_direct(self, name, target):
- entry = self.index.get(name)
- if entry:
- symlinknode = entry.pb_object
- else:
- symlinknode = self.pb2_directory.symlinks.add()
- symlinknode.name = name
- # A symlink node has no digest.
- symlinknode.target = target
- self.index[name] = IndexEntry(symlinknode, _FileType.SYMLINK, modified=(entry is not None))
+ self.index[name] = IndexEntry(name, _FileType.SYMLINK, target=target, modified=name in self.index)
+
+ self.__invalidate_digest()
def delete_entry(self, name):
- for collection in [self.pb2_directory.files, self.pb2_directory.symlinks, self.pb2_directory.directories]:
- for thing in collection:
- if thing.name == name:
- collection.remove(thing)
if name in self.index:
del self.index[name]
+ self.__invalidate_digest()
+
def descend(self, subdirectory_spec, create=False):
"""Descend one or more levels of directory hierarchy and return a new
Directory object for that directory.
@@ -262,17 +215,15 @@ class CasBasedDirectory(Directory):
else:
error = "Cannot descend into {}, which is a '{}' in the directory {}"
raise VirtualDirectoryError(error.format(subdirectory_spec[0],
- type(self.index[subdirectory_spec[0]].pb_object).__name__,
+ self.index[subdirectory_spec[0]].type,
self))
else:
if create:
newdir = self._add_directory(subdirectory_spec[0])
return newdir.descend(subdirectory_spec[1:], create)
else:
- error = "No entry called '{}' found in {}. There are directories called {}."
- directory_list = ",".join([entry.name for entry in self.pb2_directory.directories])
- raise VirtualDirectoryError(error.format(subdirectory_spec[0], str(self),
- directory_list))
+ error = "'{}' not found in {}"
+ raise VirtualDirectoryError(error.format(subdirectory_spec[0], str(self)))
return None
def _check_replacement(self, name, path_prefix, fileListResult):
@@ -305,7 +256,7 @@ class CasBasedDirectory(Directory):
def _import_files_from_directory(self, source_directory, filter_callback, *, path_prefix="", result):
""" Import files from a traditional directory. """
- for direntry in sorted(os.scandir(source_directory), key=lambda e: e.name):
+ for direntry in os.scandir(source_directory):
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, direntry.name)
@@ -345,7 +296,7 @@ class CasBasedDirectory(Directory):
def _partial_import_cas_into_cas(self, source_directory, filter_callback, *, path_prefix="", result):
""" Import files from a CAS-based directory. """
- for name, entry in sorted(source_directory.index.items()):
+ for name, entry in source_directory.index.items():
# The destination filename, relative to the root where the import started
relative_pathname = os.path.join(path_prefix, name)
@@ -375,14 +326,15 @@ class CasBasedDirectory(Directory):
if not is_dir:
if self._check_replacement(name, path_prefix, result):
- item = entry.pb_object
if entry.type == _FileType.REGULAR_FILE:
- filenode = self.pb2_directory.files.add(digest=item.digest, name=name,
- is_executable=item.is_executable)
- self.index[name] = IndexEntry(filenode, _FileType.REGULAR_FILE, modified=True)
+ self.index[name] = IndexEntry(name, _FileType.REGULAR_FILE,
+ digest=entry.digest,
+ is_executable=entry.is_executable,
+ modified=True)
+ self.__invalidate_digest()
else:
assert entry.type == _FileType.SYMLINK
- self._add_new_link_direct(name=name, target=item.target)
+ self._add_new_link_direct(name=name, target=entry.target)
result.files_written.append(relative_pathname)
def import_files(self, external_pathspec, *,
@@ -407,14 +359,6 @@ class CasBasedDirectory(Directory):
# Current behaviour is to fully populate the report, which is inefficient,
# but still correct.
- # We need to recalculate and store the hashes of all directories both
- # up and down the tree; we have changed our directory by importing files
- # which changes our hash and all our parents' hashes of us. The trees
- # lower down need to be stored in the CAS as they are not automatically
- # added during construction.
- self._recalculate_recursing_down()
- if self.parent:
- self.parent._recalculate_recursing_up(self)
return result
def set_deterministic_mtime(self):
@@ -539,23 +483,15 @@ class CasBasedDirectory(Directory):
subdir = v.get_directory(self)
yield from subdir.list_relative_paths(relpath=os.path.join(relpath, k))
- def recalculate_hash(self):
- """ Recalcuates the hash for this directory and store the results in
- the cache. If this directory has a parent, tell it to
- recalculate (since changing this directory changes an entry in
- the parent). Hashes for subdirectories also get recalculated.
- """
- self._recalculate_recursing_up()
- self._recalculate_recursing_down()
-
def get_size(self):
- total = len(self.pb2_directory.SerializeToString())
+ digest = self._get_digest()
+ total = digest.size_bytes
for i in self.index.values():
if i.type == _FileType.DIRECTORY:
subdir = i.get_directory(self)
total += subdir.get_size()
elif i.type == _FileType.REGULAR_FILE:
- src_name = self.cas_cache.objpath(i.pb_object.digest)
+ src_name = self.cas_cache.objpath(i.digest)
filesize = os.stat(src_name).st_size
total += filesize
# Symlink nodes are encoded as part of the directory serialization.
@@ -588,14 +524,41 @@ class CasBasedDirectory(Directory):
# (Digest): The Digest protobuf object for the Directory protobuf
#
def _get_digest(self):
- if not self.ref:
- self.ref = self.cas_cache.add_object(buffer=self.pb2_directory.SerializeToString())
- return self.ref
+ if not self.__digest:
+ # Create updated Directory proto
+ pb2_directory = remote_execution_pb2.Directory()
+
+ for name, entry in sorted(self.index.items()):
+ if entry.type == _FileType.DIRECTORY:
+ dirnode = pb2_directory.directories.add()
+ dirnode.name = name
+
+ # Update digests for subdirectories in DirectoryNodes.
+ # No need to call entry.get_directory().
+ # If it hasn't been instantiated, digest must be up-to-date.
+ subdir = entry.buildstream_object
+ if subdir:
+ dirnode.digest.CopyFrom(subdir._get_digest())
+ else:
+ dirnode.digest.CopyFrom(entry.digest)
+ elif entry.type == _FileType.REGULAR_FILE:
+ filenode = pb2_directory.files.add()
+ filenode.name = name
+ filenode.digest.CopyFrom(entry.digest)
+ filenode.is_executable = entry.is_executable
+ elif entry.type == _FileType.SYMLINK:
+ symlinknode = pb2_directory.symlinks.add()
+ symlinknode.name = name
+ symlinknode.target = entry.target
+
+ self.__digest = self.cas_cache.add_object(buffer=pb2_directory.SerializeToString())
+
+ return self.__digest
def _objpath(self, path):
subdir = self.descend(path[:-1])
entry = subdir.index[path[-1]]
- return self.cas_cache.objpath(entry.pb_object.digest)
+ return self.cas_cache.objpath(entry.digest)
def _exists(self, path):
try:
@@ -603,3 +566,9 @@ class CasBasedDirectory(Directory):
return path[-1] in subdir.index
except VirtualDirectoryError:
return False
+
+ def __invalidate_digest(self):
+ if self.__digest:
+ self.__digest = None
+ if self.parent:
+ self.parent.__invalidate_digest()
diff --git a/buildstream/storage/_filebaseddirectory.py b/buildstream/storage/_filebaseddirectory.py
index 61827f19c..4b0fd917b 100644
--- a/buildstream/storage/_filebaseddirectory.py
+++ b/buildstream/storage/_filebaseddirectory.py
@@ -264,14 +264,13 @@ class FileBasedDirectory(Directory):
result.ignored.append(relative_pathname)
continue
- item = entry.pb_object
if entry.type == _FileType.REGULAR_FILE:
- src_path = source_directory.cas_cache.objpath(item.digest)
+ src_path = source_directory.cas_cache.objpath(entry.digest)
actionfunc(src_path, dest_path, result=result)
- if item.is_executable:
+ if entry.is_executable:
os.chmod(dest_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
else:
assert entry.type == _FileType.SYMLINK
- os.symlink(item.target, dest_path)
+ os.symlink(entry.target, dest_path)
result.files_written.append(relative_pathname)
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 0a39f5344..a7b8bdbd9 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -233,12 +233,13 @@ def _test_push_directory(user_config_file, project_dir, artifact_digest, queue):
if cas.has_push_remotes():
# Create a CasBasedDirectory from local CAS cache content
- directory = CasBasedDirectory(context.artifactcache.cas, ref=artifact_digest)
+ directory = CasBasedDirectory(context.artifactcache.cas, digest=artifact_digest)
# Push the CasBasedDirectory object
cas.push_directory(project, directory)
- queue.put(directory.ref.hash)
+ digest = directory._get_digest()
+ queue.put(digest.hash)
else:
queue.put("No remote configured")
diff --git a/tests/internals/storage_vdir_import.py b/tests/internals/storage_vdir_import.py
index ee346ea58..268bfb21f 100644
--- a/tests/internals/storage_vdir_import.py
+++ b/tests/internals/storage_vdir_import.py
@@ -113,7 +113,8 @@ def file_contents_are(path, contents):
def create_new_casdir(root_number, cas_cache, tmpdir):
d = CasBasedDirectory(cas_cache)
d.import_files(os.path.join(tmpdir, "content", "root{}".format(root_number)))
- assert d.ref.hash != empty_hash_ref
+ digest = d._get_digest()
+ assert digest.hash != empty_hash_ref
return d
@@ -175,7 +176,7 @@ def _import_test(tmpdir, original, overlay, generator_function, verify_contents=
duplicate_cas = create_new_casdir(original, cas_cache, tmpdir)
- assert duplicate_cas.ref.hash == d.ref.hash
+ assert duplicate_cas._get_digest().hash == d._get_digest().hash
d2 = create_new_casdir(overlay, cas_cache, tmpdir)
d.import_files(d2)
@@ -213,7 +214,7 @@ def _import_test(tmpdir, original, overlay, generator_function, verify_contents=
duplicate_cas.import_files(roundtrip_dir)
- assert duplicate_cas.ref.hash == d.ref.hash
+ assert duplicate_cas._get_digest().hash == d._get_digest().hash
# It's possible to parameterize on both original and overlay values,