summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan van Berkom <tristan@codethink.co.uk>2020-09-23 16:44:36 +0900
committerTristan van Berkom <tristan@codethink.co.uk>2020-09-23 19:33:48 +0900
commite07012eadd3d67351dc568076a4f57853ad58339 (patch)
treee594796b4ae00fd244ee7ea486d1a4cf2d7a34bb
parentea753a4ee9dc12d37ce86d50f4c01ef9fcb2d874 (diff)
downloadbuildstream-tristan/remove-bst-key-requires-stage.tar.gz
source.py: Remove BST_KEY_REQUIRES_STAGEtristan/remove-bst-key-requires-stage
Refactored this to remove unneeded complexity in the code base, as described here: https://lists.apache.org/thread.html/r4b9517742433f07c79379ba5b67932cfe997c1e64965a9f1a2b613fc%40%3Cdev.buildstream.apache.org%3E Changes: * source.py: Added private Source._cache_directory() context manager We also move the assertion about nodes which are safe to write to a bit lower in Source._set_ref(), as this was unnecessarily early. When tracking a workspace, the ref will be none and will turn out to be none afterwards, it is not a problem that a workspace's node is a synthetic one, as tracking will never affect it. * local plugin: Implement get_unique_key() and stage() using the new context manager in order to optimize staging and cache key calculations here. * workspace plugin: Implement get_unique_key() and stage() using the new context manager in order to optimize staging and cache key calculations here. * trackqueue.py: No special casing with Source._is_trackable()
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py8
-rw-r--r--src/buildstream/plugins/sources/local.py57
-rw-r--r--src/buildstream/plugins/sources/workspace.py53
-rw-r--r--src/buildstream/source.py112
4 files changed, 135 insertions, 95 deletions
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index d9c31ace1..e35d5d52b 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -39,14 +39,6 @@ class TrackQueue(Queue):
return TrackQueue._track_element
def status(self, element):
- # We can skip elements entirely if they have no trackable
- # sources.
- if not any(source._is_trackable() for source in element.sources()):
-
- # But we still have to mark them as tracked
- element._tracking_done()
- return QueueStatus.SKIP
-
return QueueStatus.READY
def done(self, _, element, result, status):
diff --git a/src/buildstream/plugins/sources/local.py b/src/buildstream/plugins/sources/local.py
index ffcae4993..15962b4eb 100644
--- a/src/buildstream/plugins/sources/local.py
+++ b/src/buildstream/plugins/sources/local.py
@@ -37,19 +37,16 @@ details on common configuration options for sources.
"""
import os
-from buildstream.storage.directory import Directory
-from buildstream import Source, SourceError
+from buildstream import Source, SourceError, Directory
class LocalSource(Source):
# pylint: disable=attribute-defined-outside-init
BST_MIN_VERSION = "2.0"
-
BST_STAGE_VIRTUAL_DIRECTORY = True
- BST_KEY_REQUIRES_STAGE = True
- __unique_key = None
+ __digest = None
def configure(self, node):
node.validate_keys(["path", *Source.COMMON_CONFIG_KEYS])
@@ -65,6 +62,21 @@ class LocalSource(Source):
def is_cached(self):
return True
+ def get_unique_key(self):
+ #
+ # As a core plugin, we use some private API to optimize file hashing.
+ #
+ # * Use Source._cache_directory() to prepare a Directory
+ # * Do the regular staging activity into the Directory
+ # * Use the hash of the cached digest as the unique key
+ #
+ if not self.__digest:
+ with self._cache_directory() as directory:
+ self.__do_stage(directory)
+ self.__digest = directory._get_digest()
+
+ return self.__digest.hash
+
# We dont have a ref, we're a local file...
def load_ref(self, node):
pass
@@ -80,8 +92,38 @@ class LocalSource(Source):
pass # pragma: nocover
def stage(self, directory):
- # directory should always be a Directory object
+ #
+ # We've already prepared the CAS while resolving the cache key which
+ # will happen before staging.
+ #
+ # Now just retrieve the previously cached content to stage.
+ #
assert isinstance(directory, Directory)
+ assert self.__digest is not None
+ with self._cache_directory(digest=self.__digest) as cached_directory:
+ directory.import_files(cached_directory)
+
+ def init_workspace(self, directory):
+ #
+ # FIXME: We should be able to stage the workspace from the content
+ # cached in CAS instead of reimporting from the filesystem
+ # to the new workspace directory with this special case, but
+ # for some reason the writable bits are getting lost on regular
+ # files through the transition.
+ #
+ self.__do_stage(directory)
+
+ # As a core element, we speed up some scenarios when this is used for
+ # a junction, by providing the local path to this content directly.
+ #
+ def _get_local_path(self):
+ return self.fullpath
+
+ # Staging is implemented internally, we preemptively put it in the CAS
+ # as a side effect of resolving the cache key, at stage time we just
+ # do an internal CAS stage.
+ #
+ def __do_stage(self, directory):
with self.timed_activity("Staging local files into CAS"):
if os.path.isdir(self.fullpath) and not os.path.islink(self.fullpath):
result = directory.import_files(self.fullpath)
@@ -93,9 +135,6 @@ class LocalSource(Source):
"Failed to stage source: files clash with existing directory", reason="ensure-stage-dir-fail"
)
- def _get_local_path(self):
- return self.fullpath
-
# Plugin entry point
def setup():
diff --git a/src/buildstream/plugins/sources/workspace.py b/src/buildstream/plugins/sources/workspace.py
index 44d0889b3..df24abb91 100644
--- a/src/buildstream/plugins/sources/workspace.py
+++ b/src/buildstream/plugins/sources/workspace.py
@@ -37,30 +37,21 @@ workspace. The node constructed would be specified as follows:
import os
-from buildstream.storage.directory import Directory
-from buildstream import Source, SourceError
+from buildstream import Source, SourceError, Directory, MappingNode
from buildstream.types import SourceRef
-from buildstream.node import MappingNode
class WorkspaceSource(Source):
# pylint: disable=attribute-defined-outside-init
BST_MIN_VERSION = "2.0"
-
BST_STAGE_VIRTUAL_DIRECTORY = True
- BST_KEY_REQUIRES_STAGE = True
- # Cached unique key
- __unique_key = None
# the digest of the Directory following the import of the workspace
__digest = None
# the cache key of the last workspace build
__last_build = None
- def track(self) -> SourceRef: # pylint: disable=arguments-differ
- return None
-
def configure(self, node: MappingNode) -> None:
node.validate_keys(["path", "last_build", "kind"])
self.path = node.get_str("path")
@@ -75,6 +66,21 @@ class WorkspaceSource(Source):
def is_resolved(self):
return os.path.exists(self._get_local_path())
+ def get_unique_key(self):
+ #
+ # As a core plugin, we use some private API to optimize file hashing.
+ #
+ # * Use Source._cache_directory() to prepare a Directory
+ # * Do the regular staging activity into the Directory
+ # * Use the hash of the cached digest as the unique key
+ #
+ if not self.__digest:
+ with self._cache_directory() as directory:
+ self.__do_stage(directory)
+ self.__digest = directory._get_digest()
+
+ return self.__digest.hash
+
def get_ref(self) -> None:
return None
@@ -93,7 +99,29 @@ class WorkspaceSource(Source):
def fetch(self) -> None: # pylint: disable=arguments-differ
pass # pragma: nocover
- def stage(self, directory: Directory) -> None:
+ def stage(self, directory):
+ #
+ # We've already prepared the CAS while resolving the cache key which
+ # will happen before staging.
+ #
+ # Now just retrieve the previously cached content to stage.
+ #
+ assert isinstance(directory, Directory)
+ assert self.__digest is not None
+ with self._cache_directory(digest=self.__digest) as cached_directory:
+ directory.import_files(cached_directory)
+
+ # As a core element, we speed up some scenarios when this is used for
+ # a junction, by providing the local path to this content directly.
+ #
+ def _get_local_path(self) -> str:
+ return self.path
+
+ # Staging is implemented internally, we preemptively put it in the CAS
+ # as a side effect of resolving the cache key, at stage time we just
+ # do an internal CAS stage.
+ #
+ def __do_stage(self, directory: Directory) -> None:
assert isinstance(directory, Directory)
with self.timed_activity("Staging local files"):
result = directory.import_files(self.path, properties=["mtime"])
@@ -103,9 +131,6 @@ class WorkspaceSource(Source):
"Failed to stage source: files clash with existing directory", reason="ensure-stage-dir-fail"
)
- def _get_local_path(self) -> str:
- return self.path
-
# Plugin entry point
def setup() -> WorkspaceSource:
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index 245c3ca99..ce1cf8434 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -312,11 +312,6 @@ class Source(Plugin):
to.
"""
- BST_KEY_REQUIRES_STAGE = False
- """Whether the source will require staging in order to efficiently generate
- a unique key.
- """
-
def __init__(
self,
context: "Context",
@@ -364,7 +359,6 @@ class Source(Plugin):
self.__mirror_directory = None # type: Optional[str]
self._configure(self.__config)
- self.__digest = None
self.__is_cached = None
@@ -723,23 +717,6 @@ class Source(Plugin):
#############################################################
# Private Methods used in BuildStream #
#############################################################
- # Stage files at the localpath into the cascache
- #
- # Returns:
- # the hash of the cas directory
- def _stage_into_cas(self) -> str:
- # FIXME: this should not be called for sources with digests already set
- # since they will already have been staged into the cache. However,
- # _get_unique_key is sometimes called outside of _generate_key
- if self.__digest is None:
- cas_dir = CasBasedDirectory(self._get_context().get_cascache())
- self.stage(cas_dir)
- digest = cas_dir._get_digest()
- self.__digest = digest
- else:
- # XXX: an assignment to please mypy
- digest = self.__digest
- return digest.hash
# Wrapper around preflight() method
#
@@ -813,14 +790,8 @@ class Source(Plugin):
# 'directory' option
#
def _stage(self, directory):
- if self.BST_KEY_REQUIRES_STAGE:
- # _get_unique_key should be called before _stage
- assert self.__digest is not None
- cas_dir = CasBasedDirectory(self._get_context().get_cascache(), digest=self.__digest)
- directory.import_files(cas_dir)
- else:
- self.validate_cache()
- self.stage(directory)
+ self.validate_cache()
+ self.stage(directory)
# Wrapper for init_workspace()
def _init_workspace(self, directory):
@@ -835,10 +806,7 @@ class Source(Plugin):
# Wrapper for get_unique_key() api
#
def _get_unique_key(self):
- if self.BST_KEY_REQUIRES_STAGE:
- return self._stage_into_cas()
- else:
- return self.get_unique_key()
+ return self.get_unique_key()
# _project_refs():
#
@@ -954,20 +922,6 @@ class Source(Plugin):
if project is toplevel and not node:
node = provenance._node
- # Ensure the node is not from a junction
- if not toplevel.ref_storage == ProjectRefStorage.PROJECT_REFS and provenance._project is not toplevel:
- if provenance._project is project:
- self.warn("{}: Not persisting new reference in junctioned project".format(self))
- elif provenance._project is None:
- assert provenance._filename == ""
- assert provenance._shortname == ""
- raise SourceError("{}: Error saving source reference to synthetic node.".format(self))
- else:
- raise SourceError(
- "{}: Cannot track source in a fragment from a junction".format(provenance._shortname),
- reason="tracking-junction-fragment",
- )
-
#
# Step 2 - Set the ref in memory, and determine changed state
#
@@ -988,6 +942,21 @@ class Source(Plugin):
# tracking in the future. For now, this is quite safe.
return False
+ # Ensure the node is not from a junction
+ if not toplevel.ref_storage == ProjectRefStorage.PROJECT_REFS and provenance._project is not toplevel:
+ if provenance._project is project:
+ self.warn("{}: Not persisting new reference in junctioned project".format(self))
+ elif provenance._project is None:
+ assert provenance._filename == ""
+ assert provenance._shortname == ""
+
+ raise SourceError("{}: Error saving source reference to synthetic node.".format(self))
+ else:
+ raise SourceError(
+ "{}: Cannot track source in a fragment from a junction".format(provenance._shortname),
+ reason="tracking-junction-fragment",
+ )
+
actions = {}
for k, v in clean.items():
if k not in to_modify:
@@ -1081,11 +1050,6 @@ class Source(Plugin):
# previous_sources_dir (str): directory where previous sources are staged
#
def _track(self, previous_sources_dir: str = None) -> SourceRef:
- if self.BST_KEY_REQUIRES_STAGE:
- # ensure that these sources have a key after tracking
- self._generate_key()
- return None
-
if self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK:
new_ref = self.__do_track(previous_sources_dir=previous_sources_dir)
else:
@@ -1107,16 +1071,6 @@ class Source(Plugin):
return new_ref
- # _is_trackable()
- #
- # Returns:
- # (bool): Whether this source is trackable
- #
- def _is_trackable(self) -> bool:
- """Report whether this source can be tracked."""
- # sources that require staging to generate keys cannot be tracked
- return not self.BST_KEY_REQUIRES_STAGE
-
# _requires_previous_sources()
#
# If a plugin requires access to previous sources at track or fetch time,
@@ -1162,6 +1116,36 @@ class Source(Plugin):
def _element_name(self):
return self.__element_name
+ # _cache_directory()
+ #
+ # A context manager to cache and retrieve content.
+ #
+ # If the digest is not specified, then a new directory is prepared, the
+ # content of which can later be addressed by accessing it's digest,
+ # using the private API Directory._get_digest().
+ #
+ # The hash of the Digest of the cached directory is suitable for use as a
+ # cache key, and the Digest object can be reused later on to do the
+ # staging operation.
+ #
+ # This context manager was added specifically to optimize cases where
+ # we have project or host local data to stage into CAS, such as local
+ # sources and workspaces.
+ #
+ # Args:
+ # digest: A Digest of previously cached content.
+ #
+ # Yields:
+ # (Directory): A handle on the cached content directory
+ #
+ @contextmanager
+ def _cache_directory(self, digest=None):
+ context = self._get_context()
+ cache = context.get_cascache()
+ cas_dir = CasBasedDirectory(cache, digest=digest)
+
+ yield cas_dir
+
#############################################################
# Local Private Methods #
#############################################################