diff options
author | Tristan van Berkom <tristan@codethink.co.uk> | 2020-09-23 16:44:36 +0900 |
---|---|---|
committer | Tristan van Berkom <tristan@codethink.co.uk> | 2020-09-24 18:09:58 +0900 |
commit | b583317c7954d29fce60728486537e945e0d9c28 (patch) | |
tree | 03e6792ee6357e2fbebcb994d4a529f4b54b85bd | |
parent | ea753a4ee9dc12d37ce86d50f4c01ef9fcb2d874 (diff) | |
download | buildstream-b583317c7954d29fce60728486537e945e0d9c28.tar.gz |
source.py: Remove BST_KEY_REQUIRES_STAGE
Refactored this to remove unneeded complexity in the code base,
as described here:
https://lists.apache.org/thread.html/r4b9517742433f07c79379ba5b67932cfe997c1e64965a9f1a2b613fc%40%3Cdev.buildstream.apache.org%3E
Changes:
* source.py: Added private Source._cache_directory() context manager
We also move the assertion about nodes which are safe to write to
a bit lower in Source._set_ref(), as this was unnecessarily early.
When tracking a workspace, the ref will be none and will turn out
to be none afterwards, it is not a problem that a workspace's node
is a synthetic one, as tracking will never affect it.
* local plugin: Implement get_unique_key() and stage() using
the new context manager in order to optimize staging and
cache key calculations here.
* workspace plugin: Implement get_unique_key() and stage() using
the new context manager in order to optimize staging and
cache key calculations here.
* trackqueue.py: No special casing with Source._is_trackable()
-rw-r--r-- | src/buildstream/_scheduler/queues/trackqueue.py | 5 | ||||
-rw-r--r-- | src/buildstream/plugins/sources/local.py | 57 | ||||
-rw-r--r-- | src/buildstream/plugins/sources/workspace.py | 53 | ||||
-rw-r--r-- | src/buildstream/source.py | 112 |
4 files changed, 137 insertions, 90 deletions
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index d9c31ace1..5b3f05b57 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -39,9 +39,8 @@ class TrackQueue(Queue): return TrackQueue._track_element def status(self, element): - # We can skip elements entirely if they have no trackable - # sources. - if not any(source._is_trackable() for source in element.sources()): + # We can skip elements without any sources + if not any(element.sources()): # But we still have to mark them as tracked element._tracking_done() diff --git a/src/buildstream/plugins/sources/local.py b/src/buildstream/plugins/sources/local.py index ffcae4993..15962b4eb 100644 --- a/src/buildstream/plugins/sources/local.py +++ b/src/buildstream/plugins/sources/local.py @@ -37,19 +37,16 @@ details on common configuration options for sources. """ import os -from buildstream.storage.directory import Directory -from buildstream import Source, SourceError +from buildstream import Source, SourceError, Directory class LocalSource(Source): # pylint: disable=attribute-defined-outside-init BST_MIN_VERSION = "2.0" - BST_STAGE_VIRTUAL_DIRECTORY = True - BST_KEY_REQUIRES_STAGE = True - __unique_key = None + __digest = None def configure(self, node): node.validate_keys(["path", *Source.COMMON_CONFIG_KEYS]) @@ -65,6 +62,21 @@ class LocalSource(Source): def is_cached(self): return True + def get_unique_key(self): + # + # As a core plugin, we use some private API to optimize file hashing. + # + # * Use Source._cache_directory() to prepare a Directory + # * Do the regular staging activity into the Directory + # * Use the hash of the cached digest as the unique key + # + if not self.__digest: + with self._cache_directory() as directory: + self.__do_stage(directory) + self.__digest = directory._get_digest() + + return self.__digest.hash + # We dont have a ref, we're a local file... def load_ref(self, node): pass @@ -80,8 +92,38 @@ class LocalSource(Source): pass # pragma: nocover def stage(self, directory): - # directory should always be a Directory object + # + # We've already prepared the CAS while resolving the cache key which + # will happen before staging. + # + # Now just retrieve the previously cached content to stage. + # assert isinstance(directory, Directory) + assert self.__digest is not None + with self._cache_directory(digest=self.__digest) as cached_directory: + directory.import_files(cached_directory) + + def init_workspace(self, directory): + # + # FIXME: We should be able to stage the workspace from the content + # cached in CAS instead of reimporting from the filesystem + # to the new workspace directory with this special case, but + # for some reason the writable bits are getting lost on regular + # files through the transition. + # + self.__do_stage(directory) + + # As a core element, we speed up some scenarios when this is used for + # a junction, by providing the local path to this content directly. + # + def _get_local_path(self): + return self.fullpath + + # Staging is implemented internally, we preemptively put it in the CAS + # as a side effect of resolving the cache key, at stage time we just + # do an internal CAS stage. + # + def __do_stage(self, directory): with self.timed_activity("Staging local files into CAS"): if os.path.isdir(self.fullpath) and not os.path.islink(self.fullpath): result = directory.import_files(self.fullpath) @@ -93,9 +135,6 @@ class LocalSource(Source): "Failed to stage source: files clash with existing directory", reason="ensure-stage-dir-fail" ) - def _get_local_path(self): - return self.fullpath - # Plugin entry point def setup(): diff --git a/src/buildstream/plugins/sources/workspace.py b/src/buildstream/plugins/sources/workspace.py index 44d0889b3..df24abb91 100644 --- a/src/buildstream/plugins/sources/workspace.py +++ b/src/buildstream/plugins/sources/workspace.py @@ -37,30 +37,21 @@ workspace. The node constructed would be specified as follows: import os -from buildstream.storage.directory import Directory -from buildstream import Source, SourceError +from buildstream import Source, SourceError, Directory, MappingNode from buildstream.types import SourceRef -from buildstream.node import MappingNode class WorkspaceSource(Source): # pylint: disable=attribute-defined-outside-init BST_MIN_VERSION = "2.0" - BST_STAGE_VIRTUAL_DIRECTORY = True - BST_KEY_REQUIRES_STAGE = True - # Cached unique key - __unique_key = None # the digest of the Directory following the import of the workspace __digest = None # the cache key of the last workspace build __last_build = None - def track(self) -> SourceRef: # pylint: disable=arguments-differ - return None - def configure(self, node: MappingNode) -> None: node.validate_keys(["path", "last_build", "kind"]) self.path = node.get_str("path") @@ -75,6 +66,21 @@ class WorkspaceSource(Source): def is_resolved(self): return os.path.exists(self._get_local_path()) + def get_unique_key(self): + # + # As a core plugin, we use some private API to optimize file hashing. + # + # * Use Source._cache_directory() to prepare a Directory + # * Do the regular staging activity into the Directory + # * Use the hash of the cached digest as the unique key + # + if not self.__digest: + with self._cache_directory() as directory: + self.__do_stage(directory) + self.__digest = directory._get_digest() + + return self.__digest.hash + def get_ref(self) -> None: return None @@ -93,7 +99,29 @@ class WorkspaceSource(Source): def fetch(self) -> None: # pylint: disable=arguments-differ pass # pragma: nocover - def stage(self, directory: Directory) -> None: + def stage(self, directory): + # + # We've already prepared the CAS while resolving the cache key which + # will happen before staging. + # + # Now just retrieve the previously cached content to stage. + # + assert isinstance(directory, Directory) + assert self.__digest is not None + with self._cache_directory(digest=self.__digest) as cached_directory: + directory.import_files(cached_directory) + + # As a core element, we speed up some scenarios when this is used for + # a junction, by providing the local path to this content directly. + # + def _get_local_path(self) -> str: + return self.path + + # Staging is implemented internally, we preemptively put it in the CAS + # as a side effect of resolving the cache key, at stage time we just + # do an internal CAS stage. + # + def __do_stage(self, directory: Directory) -> None: assert isinstance(directory, Directory) with self.timed_activity("Staging local files"): result = directory.import_files(self.path, properties=["mtime"]) @@ -103,9 +131,6 @@ class WorkspaceSource(Source): "Failed to stage source: files clash with existing directory", reason="ensure-stage-dir-fail" ) - def _get_local_path(self) -> str: - return self.path - # Plugin entry point def setup() -> WorkspaceSource: diff --git a/src/buildstream/source.py b/src/buildstream/source.py index 245c3ca99..ce1cf8434 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -312,11 +312,6 @@ class Source(Plugin): to. """ - BST_KEY_REQUIRES_STAGE = False - """Whether the source will require staging in order to efficiently generate - a unique key. - """ - def __init__( self, context: "Context", @@ -364,7 +359,6 @@ class Source(Plugin): self.__mirror_directory = None # type: Optional[str] self._configure(self.__config) - self.__digest = None self.__is_cached = None @@ -723,23 +717,6 @@ class Source(Plugin): ############################################################# # Private Methods used in BuildStream # ############################################################# - # Stage files at the localpath into the cascache - # - # Returns: - # the hash of the cas directory - def _stage_into_cas(self) -> str: - # FIXME: this should not be called for sources with digests already set - # since they will already have been staged into the cache. However, - # _get_unique_key is sometimes called outside of _generate_key - if self.__digest is None: - cas_dir = CasBasedDirectory(self._get_context().get_cascache()) - self.stage(cas_dir) - digest = cas_dir._get_digest() - self.__digest = digest - else: - # XXX: an assignment to please mypy - digest = self.__digest - return digest.hash # Wrapper around preflight() method # @@ -813,14 +790,8 @@ class Source(Plugin): # 'directory' option # def _stage(self, directory): - if self.BST_KEY_REQUIRES_STAGE: - # _get_unique_key should be called before _stage - assert self.__digest is not None - cas_dir = CasBasedDirectory(self._get_context().get_cascache(), digest=self.__digest) - directory.import_files(cas_dir) - else: - self.validate_cache() - self.stage(directory) + self.validate_cache() + self.stage(directory) # Wrapper for init_workspace() def _init_workspace(self, directory): @@ -835,10 +806,7 @@ class Source(Plugin): # Wrapper for get_unique_key() api # def _get_unique_key(self): - if self.BST_KEY_REQUIRES_STAGE: - return self._stage_into_cas() - else: - return self.get_unique_key() + return self.get_unique_key() # _project_refs(): # @@ -954,20 +922,6 @@ class Source(Plugin): if project is toplevel and not node: node = provenance._node - # Ensure the node is not from a junction - if not toplevel.ref_storage == ProjectRefStorage.PROJECT_REFS and provenance._project is not toplevel: - if provenance._project is project: - self.warn("{}: Not persisting new reference in junctioned project".format(self)) - elif provenance._project is None: - assert provenance._filename == "" - assert provenance._shortname == "" - raise SourceError("{}: Error saving source reference to synthetic node.".format(self)) - else: - raise SourceError( - "{}: Cannot track source in a fragment from a junction".format(provenance._shortname), - reason="tracking-junction-fragment", - ) - # # Step 2 - Set the ref in memory, and determine changed state # @@ -988,6 +942,21 @@ class Source(Plugin): # tracking in the future. For now, this is quite safe. return False + # Ensure the node is not from a junction + if not toplevel.ref_storage == ProjectRefStorage.PROJECT_REFS and provenance._project is not toplevel: + if provenance._project is project: + self.warn("{}: Not persisting new reference in junctioned project".format(self)) + elif provenance._project is None: + assert provenance._filename == "" + assert provenance._shortname == "" + + raise SourceError("{}: Error saving source reference to synthetic node.".format(self)) + else: + raise SourceError( + "{}: Cannot track source in a fragment from a junction".format(provenance._shortname), + reason="tracking-junction-fragment", + ) + actions = {} for k, v in clean.items(): if k not in to_modify: @@ -1081,11 +1050,6 @@ class Source(Plugin): # previous_sources_dir (str): directory where previous sources are staged # def _track(self, previous_sources_dir: str = None) -> SourceRef: - if self.BST_KEY_REQUIRES_STAGE: - # ensure that these sources have a key after tracking - self._generate_key() - return None - if self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK: new_ref = self.__do_track(previous_sources_dir=previous_sources_dir) else: @@ -1107,16 +1071,6 @@ class Source(Plugin): return new_ref - # _is_trackable() - # - # Returns: - # (bool): Whether this source is trackable - # - def _is_trackable(self) -> bool: - """Report whether this source can be tracked.""" - # sources that require staging to generate keys cannot be tracked - return not self.BST_KEY_REQUIRES_STAGE - # _requires_previous_sources() # # If a plugin requires access to previous sources at track or fetch time, @@ -1162,6 +1116,36 @@ class Source(Plugin): def _element_name(self): return self.__element_name + # _cache_directory() + # + # A context manager to cache and retrieve content. + # + # If the digest is not specified, then a new directory is prepared, the + # content of which can later be addressed by accessing it's digest, + # using the private API Directory._get_digest(). + # + # The hash of the Digest of the cached directory is suitable for use as a + # cache key, and the Digest object can be reused later on to do the + # staging operation. + # + # This context manager was added specifically to optimize cases where + # we have project or host local data to stage into CAS, such as local + # sources and workspaces. + # + # Args: + # digest: A Digest of previously cached content. + # + # Yields: + # (Directory): A handle on the cached content directory + # + @contextmanager + def _cache_directory(self, digest=None): + context = self._get_context() + cache = context.get_cascache() + cas_dir = CasBasedDirectory(cache, digest=digest) + + yield cas_dir + ############################################################# # Local Private Methods # ############################################################# |