diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-09-24 11:11:45 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-09-24 11:11:45 +0000 |
commit | 9fba7a1fb27da5fac968fc3eeedc2017b282f344 (patch) | |
tree | 56c598df2dc302af17ccc7d120c6e50cc208a1a9 | |
parent | 34d69a5bee001c813842594d966b52a16cbe187d (diff) | |
parent | b46095dec2468d6f41b06459d8acf2e9f77917b1 (diff) | |
download | buildstream-9fba7a1fb27da5fac968fc3eeedc2017b282f344.tar.gz |
Merge branch 'tristan/remove-bst-key-requires-stage' into 'master'
source.py: Remove BST_KEY_REQUIRES_STAGE
See merge request BuildStream/buildstream!2072
-rw-r--r-- | src/buildstream/__init__.py | 1 | ||||
-rw-r--r-- | src/buildstream/_elementsources.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/trackqueue.py | 5 | ||||
-rw-r--r-- | src/buildstream/plugins/sources/local.py | 57 | ||||
-rw-r--r-- | src/buildstream/plugins/sources/workspace.py | 53 | ||||
-rw-r--r-- | src/buildstream/source.py | 112 | ||||
-rw-r--r-- | tests/frontend/track.py | 79 |
7 files changed, 224 insertions, 90 deletions
diff --git a/src/buildstream/__init__.py b/src/buildstream/__init__.py index c6722023f..7e7cb1514 100644 --- a/src/buildstream/__init__.py +++ b/src/buildstream/__init__.py @@ -30,6 +30,7 @@ if "_BST_COMPLETION" not in os.environ: from .utils import UtilError, ProgramNotFoundError from .sandbox import Sandbox, SandboxFlags, SandboxCommandError + from .storage import Directory from .types import CoreWarnings, OverlapAction from .node import MappingNode, Node, ProvenanceInformation, ScalarNode, SequenceNode from .plugin import Plugin diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py index c030591f8..eed847e75 100644 --- a/src/buildstream/_elementsources.py +++ b/src/buildstream/_elementsources.py @@ -20,6 +20,7 @@ from contextlib import contextmanager from typing import TYPE_CHECKING, Iterator from . import _cachekey, utils +from ._exceptions import SkipJob from ._context import Context from ._protos.buildstream.v2 import source_pb2 from .plugin import Plugin @@ -109,6 +110,12 @@ class ElementSources: ) source.warn("Updated reference will be ignored as source has open workspace", detail=detail) + # Sources which do not implement track() will return None, produce + # a SKIP message in the UI if all sources produce None + # + if all(ref is None for _, ref in refs): + raise SkipJob("Element sources are not trackable") + return refs # stage_and_cache(): diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index d9c31ace1..5b3f05b57 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -39,9 +39,8 @@ class TrackQueue(Queue): return TrackQueue._track_element def status(self, element): - # We can skip elements entirely if they have no trackable - # sources. - if not any(source._is_trackable() for source in element.sources()): + # We can skip elements without any sources + if not any(element.sources()): # But we still have to mark them as tracked element._tracking_done() diff --git a/src/buildstream/plugins/sources/local.py b/src/buildstream/plugins/sources/local.py index ffcae4993..15962b4eb 100644 --- a/src/buildstream/plugins/sources/local.py +++ b/src/buildstream/plugins/sources/local.py @@ -37,19 +37,16 @@ details on common configuration options for sources. """ import os -from buildstream.storage.directory import Directory -from buildstream import Source, SourceError +from buildstream import Source, SourceError, Directory class LocalSource(Source): # pylint: disable=attribute-defined-outside-init BST_MIN_VERSION = "2.0" - BST_STAGE_VIRTUAL_DIRECTORY = True - BST_KEY_REQUIRES_STAGE = True - __unique_key = None + __digest = None def configure(self, node): node.validate_keys(["path", *Source.COMMON_CONFIG_KEYS]) @@ -65,6 +62,21 @@ class LocalSource(Source): def is_cached(self): return True + def get_unique_key(self): + # + # As a core plugin, we use some private API to optimize file hashing. + # + # * Use Source._cache_directory() to prepare a Directory + # * Do the regular staging activity into the Directory + # * Use the hash of the cached digest as the unique key + # + if not self.__digest: + with self._cache_directory() as directory: + self.__do_stage(directory) + self.__digest = directory._get_digest() + + return self.__digest.hash + # We dont have a ref, we're a local file... def load_ref(self, node): pass @@ -80,8 +92,38 @@ class LocalSource(Source): pass # pragma: nocover def stage(self, directory): - # directory should always be a Directory object + # + # We've already prepared the CAS while resolving the cache key which + # will happen before staging. + # + # Now just retrieve the previously cached content to stage. + # assert isinstance(directory, Directory) + assert self.__digest is not None + with self._cache_directory(digest=self.__digest) as cached_directory: + directory.import_files(cached_directory) + + def init_workspace(self, directory): + # + # FIXME: We should be able to stage the workspace from the content + # cached in CAS instead of reimporting from the filesystem + # to the new workspace directory with this special case, but + # for some reason the writable bits are getting lost on regular + # files through the transition. + # + self.__do_stage(directory) + + # As a core element, we speed up some scenarios when this is used for + # a junction, by providing the local path to this content directly. + # + def _get_local_path(self): + return self.fullpath + + # Staging is implemented internally, we preemptively put it in the CAS + # as a side effect of resolving the cache key, at stage time we just + # do an internal CAS stage. + # + def __do_stage(self, directory): with self.timed_activity("Staging local files into CAS"): if os.path.isdir(self.fullpath) and not os.path.islink(self.fullpath): result = directory.import_files(self.fullpath) @@ -93,9 +135,6 @@ class LocalSource(Source): "Failed to stage source: files clash with existing directory", reason="ensure-stage-dir-fail" ) - def _get_local_path(self): - return self.fullpath - # Plugin entry point def setup(): diff --git a/src/buildstream/plugins/sources/workspace.py b/src/buildstream/plugins/sources/workspace.py index 44d0889b3..df24abb91 100644 --- a/src/buildstream/plugins/sources/workspace.py +++ b/src/buildstream/plugins/sources/workspace.py @@ -37,30 +37,21 @@ workspace. The node constructed would be specified as follows: import os -from buildstream.storage.directory import Directory -from buildstream import Source, SourceError +from buildstream import Source, SourceError, Directory, MappingNode from buildstream.types import SourceRef -from buildstream.node import MappingNode class WorkspaceSource(Source): # pylint: disable=attribute-defined-outside-init BST_MIN_VERSION = "2.0" - BST_STAGE_VIRTUAL_DIRECTORY = True - BST_KEY_REQUIRES_STAGE = True - # Cached unique key - __unique_key = None # the digest of the Directory following the import of the workspace __digest = None # the cache key of the last workspace build __last_build = None - def track(self) -> SourceRef: # pylint: disable=arguments-differ - return None - def configure(self, node: MappingNode) -> None: node.validate_keys(["path", "last_build", "kind"]) self.path = node.get_str("path") @@ -75,6 +66,21 @@ class WorkspaceSource(Source): def is_resolved(self): return os.path.exists(self._get_local_path()) + def get_unique_key(self): + # + # As a core plugin, we use some private API to optimize file hashing. + # + # * Use Source._cache_directory() to prepare a Directory + # * Do the regular staging activity into the Directory + # * Use the hash of the cached digest as the unique key + # + if not self.__digest: + with self._cache_directory() as directory: + self.__do_stage(directory) + self.__digest = directory._get_digest() + + return self.__digest.hash + def get_ref(self) -> None: return None @@ -93,7 +99,29 @@ class WorkspaceSource(Source): def fetch(self) -> None: # pylint: disable=arguments-differ pass # pragma: nocover - def stage(self, directory: Directory) -> None: + def stage(self, directory): + # + # We've already prepared the CAS while resolving the cache key which + # will happen before staging. + # + # Now just retrieve the previously cached content to stage. + # + assert isinstance(directory, Directory) + assert self.__digest is not None + with self._cache_directory(digest=self.__digest) as cached_directory: + directory.import_files(cached_directory) + + # As a core element, we speed up some scenarios when this is used for + # a junction, by providing the local path to this content directly. + # + def _get_local_path(self) -> str: + return self.path + + # Staging is implemented internally, we preemptively put it in the CAS + # as a side effect of resolving the cache key, at stage time we just + # do an internal CAS stage. + # + def __do_stage(self, directory: Directory) -> None: assert isinstance(directory, Directory) with self.timed_activity("Staging local files"): result = directory.import_files(self.path, properties=["mtime"]) @@ -103,9 +131,6 @@ class WorkspaceSource(Source): "Failed to stage source: files clash with existing directory", reason="ensure-stage-dir-fail" ) - def _get_local_path(self) -> str: - return self.path - # Plugin entry point def setup() -> WorkspaceSource: diff --git a/src/buildstream/source.py b/src/buildstream/source.py index 245c3ca99..ce1cf8434 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -312,11 +312,6 @@ class Source(Plugin): to. """ - BST_KEY_REQUIRES_STAGE = False - """Whether the source will require staging in order to efficiently generate - a unique key. - """ - def __init__( self, context: "Context", @@ -364,7 +359,6 @@ class Source(Plugin): self.__mirror_directory = None # type: Optional[str] self._configure(self.__config) - self.__digest = None self.__is_cached = None @@ -723,23 +717,6 @@ class Source(Plugin): ############################################################# # Private Methods used in BuildStream # ############################################################# - # Stage files at the localpath into the cascache - # - # Returns: - # the hash of the cas directory - def _stage_into_cas(self) -> str: - # FIXME: this should not be called for sources with digests already set - # since they will already have been staged into the cache. However, - # _get_unique_key is sometimes called outside of _generate_key - if self.__digest is None: - cas_dir = CasBasedDirectory(self._get_context().get_cascache()) - self.stage(cas_dir) - digest = cas_dir._get_digest() - self.__digest = digest - else: - # XXX: an assignment to please mypy - digest = self.__digest - return digest.hash # Wrapper around preflight() method # @@ -813,14 +790,8 @@ class Source(Plugin): # 'directory' option # def _stage(self, directory): - if self.BST_KEY_REQUIRES_STAGE: - # _get_unique_key should be called before _stage - assert self.__digest is not None - cas_dir = CasBasedDirectory(self._get_context().get_cascache(), digest=self.__digest) - directory.import_files(cas_dir) - else: - self.validate_cache() - self.stage(directory) + self.validate_cache() + self.stage(directory) # Wrapper for init_workspace() def _init_workspace(self, directory): @@ -835,10 +806,7 @@ class Source(Plugin): # Wrapper for get_unique_key() api # def _get_unique_key(self): - if self.BST_KEY_REQUIRES_STAGE: - return self._stage_into_cas() - else: - return self.get_unique_key() + return self.get_unique_key() # _project_refs(): # @@ -954,20 +922,6 @@ class Source(Plugin): if project is toplevel and not node: node = provenance._node - # Ensure the node is not from a junction - if not toplevel.ref_storage == ProjectRefStorage.PROJECT_REFS and provenance._project is not toplevel: - if provenance._project is project: - self.warn("{}: Not persisting new reference in junctioned project".format(self)) - elif provenance._project is None: - assert provenance._filename == "" - assert provenance._shortname == "" - raise SourceError("{}: Error saving source reference to synthetic node.".format(self)) - else: - raise SourceError( - "{}: Cannot track source in a fragment from a junction".format(provenance._shortname), - reason="tracking-junction-fragment", - ) - # # Step 2 - Set the ref in memory, and determine changed state # @@ -988,6 +942,21 @@ class Source(Plugin): # tracking in the future. For now, this is quite safe. return False + # Ensure the node is not from a junction + if not toplevel.ref_storage == ProjectRefStorage.PROJECT_REFS and provenance._project is not toplevel: + if provenance._project is project: + self.warn("{}: Not persisting new reference in junctioned project".format(self)) + elif provenance._project is None: + assert provenance._filename == "" + assert provenance._shortname == "" + + raise SourceError("{}: Error saving source reference to synthetic node.".format(self)) + else: + raise SourceError( + "{}: Cannot track source in a fragment from a junction".format(provenance._shortname), + reason="tracking-junction-fragment", + ) + actions = {} for k, v in clean.items(): if k not in to_modify: @@ -1081,11 +1050,6 @@ class Source(Plugin): # previous_sources_dir (str): directory where previous sources are staged # def _track(self, previous_sources_dir: str = None) -> SourceRef: - if self.BST_KEY_REQUIRES_STAGE: - # ensure that these sources have a key after tracking - self._generate_key() - return None - if self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK: new_ref = self.__do_track(previous_sources_dir=previous_sources_dir) else: @@ -1107,16 +1071,6 @@ class Source(Plugin): return new_ref - # _is_trackable() - # - # Returns: - # (bool): Whether this source is trackable - # - def _is_trackable(self) -> bool: - """Report whether this source can be tracked.""" - # sources that require staging to generate keys cannot be tracked - return not self.BST_KEY_REQUIRES_STAGE - # _requires_previous_sources() # # If a plugin requires access to previous sources at track or fetch time, @@ -1162,6 +1116,36 @@ class Source(Plugin): def _element_name(self): return self.__element_name + # _cache_directory() + # + # A context manager to cache and retrieve content. + # + # If the digest is not specified, then a new directory is prepared, the + # content of which can later be addressed by accessing it's digest, + # using the private API Directory._get_digest(). + # + # The hash of the Digest of the cached directory is suitable for use as a + # cache key, and the Digest object can be reused later on to do the + # staging operation. + # + # This context manager was added specifically to optimize cases where + # we have project or host local data to stage into CAS, such as local + # sources and workspaces. + # + # Args: + # digest: A Digest of previously cached content. + # + # Yields: + # (Directory): A handle on the cached content directory + # + @contextmanager + def _cache_directory(self, digest=None): + context = self._get_context() + cache = context.get_cascache() + cas_dir = CasBasedDirectory(cache, digest=digest) + + yield cas_dir + ############################################################# # Local Private Methods # ############################################################# diff --git a/tests/frontend/track.py b/tests/frontend/track.py index 51768f04e..682ee0e98 100644 --- a/tests/frontend/track.py +++ b/tests/frontend/track.py @@ -3,6 +3,7 @@ import stat import os +import re import pytest from buildstream.testing import create_repo, generate_project @@ -419,3 +420,81 @@ def test_track_with_comments(cli, datafiles): # Assert that the sources are cached assert cli.get_element_state(project, target) == "buildable" + + +# Test that elements which contain only sources which do not implement Source.track() +# produce a SKIP message in the logs instead of a SUCCESS message when tracking the +# element. +# +# Also test the same for an open workspace, which would be trackable if the +# workspace was not open. +# +# Also test that elements which do not have any sources do not produce any messages at all, +# as these jobs are discarded before ever processing. +# +@pytest.mark.datafiles(DATA_DIR) +def test_track_skip(cli, tmpdir, datafiles): + project = str(datafiles) + dev_files_path = os.path.join(project, "files", "dev-files") + element_path = os.path.join(project, "elements") + element_dep_name = "track-test-dep.bst" + element_workspace_name = "track-test-workspace.bst" + element_target_name = "track-test-target.bst" + workspace_dir = os.path.join(str(tmpdir), "workspace") + + # Generate an import element with some local source plugins, these + # do not implement track() and thus can be skipped. + # + element = { + "kind": "import", + "sources": [ + {"kind": "local", "path": "files/dev-files", "directory": "/foo"}, + {"kind": "local", "path": "files/dev-files", "directory": "/bar"}, + ], + } + _yaml.roundtrip_dump(element, os.path.join(element_path, element_dep_name)) + + # Generate a regular import element which will have a workspace open + # + repo = create_repo("tar", str(tmpdir)) + repo.create(dev_files_path) + generate_element(repo, os.path.join(element_path, element_workspace_name)) + + # Generate a stack element which depends on the import of local files + # + # Stack elements do not have any sources, as such they are also skipped. + # + element = { + "kind": "stack", + "depends": [element_dep_name, element_workspace_name], + } + _yaml.roundtrip_dump(element, os.path.join(element_path, element_target_name)) + + # First track and fetch the workspace element + result = cli.run(project=project, args=["source", "track", "--deps", "none", element_workspace_name]) + result.assert_success() + result = cli.run(project=project, args=["source", "fetch", "--deps", "none", element_workspace_name]) + result.assert_success() + + # Open the workspace so it really is a workspace + result = cli.run(project=project, args=["workspace", "open", "--directory", workspace_dir, element_workspace_name]) + result.assert_success() + + # Now run track on the stack and all the deps + result = cli.run(project=project, args=["source", "track", "--deps", "all", element_target_name]) + result.assert_success() + + # Assert we got the expected skip messages + pattern = r"\[.*track:track-test-dep\.bst.*\] SKIPPED" + assert len(re.findall(pattern, result.stderr, re.MULTILINE)) == 1 + pattern = r"\[.*track:track-test-workspace\.bst.*\] SKIPPED" + assert len(re.findall(pattern, result.stderr, re.MULTILINE)) == 1 + + # For now, we expect to not see the job for stack elements + # + # This may be revisited, need to consider if we should emit + # START/SKIPPED message pairs for jobs which were assessed to + # be unneeded before ever processing. + # + pattern = r"\[.*track:track-test-target\.bst.*\]" + assert len(re.findall(pattern, result.stderr, re.MULTILINE)) == 0 |