summaryrefslogtreecommitdiff
path: root/src/buildstream/element.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/element.py')
-rw-r--r--src/buildstream/element.py286
1 files changed, 75 insertions, 211 deletions
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 916962d44..dbbc7bf93 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1,5 +1,6 @@
#
# Copyright (C) 2016-2018 Codethink Limited
+# Copyright (C) 2017-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -104,10 +105,10 @@ from .sandbox._config import SandboxConfig
from .sandbox._sandboxremote import SandboxRemote
from .types import CoreWarnings, Scope, _CacheBuildTrees, _KeyStrength
from ._artifact import Artifact
+from ._elementsources import ElementSources
from .storage.directory import Directory
from .storage._filebaseddirectory import FileBasedDirectory
-from .storage._casbaseddirectory import CasBasedDirectory
from .storage.directory import VirtualDirectoryError
if TYPE_CHECKING:
@@ -236,19 +237,15 @@ class Element(Plugin):
self.__ready_for_runtime = False # Whether the element and its runtime dependencies have cache keys
self.__ready_for_runtime_and_cached = False # Whether all runtime deps are cached, as well as the element
self.__cached_remotely = None # Whether the element is cached remotely
- # List of Sources
- self.__sources = [] # type: List[Source]
- self.__sources_vdir = None # Directory with staged sources
+ self.__sources = ElementSources(context) # The element sources
self.__weak_cache_key = None # Our cached weak cache key
self.__strict_cache_key = None # Our cached cache key for strict builds
self.__artifacts = context.artifactcache # Artifact cache
self.__sourcecache = context.sourcecache # Source cache
- self.__is_resolved = False # Whether the source is fully resolved or not
self.__assemble_scheduled = False # Element is scheduled to be assembled
self.__assemble_done = False # Element is assembled
self.__pull_done = False # Whether pull was attempted
self.__cached_successfully = None # If the Element is known to be successfully cached
- self.__has_all_sources_in_source_cache = None # If the sources are known to be successfully cached
self.__splits = None # Resolved regex objects for computing split domains
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
self.__tainted = None # Whether the artifact is tainted and should not be shared
@@ -260,10 +257,6 @@ class Element(Plugin):
self.__strict_artifact = None # Artifact for strict cache key
self.__meta_kind = meta.kind # The kind of this source, required for unpickling
- # the index of the last source in this element that requires previous
- # sources for staging
- self.__last_source_requires_previous_ix = None
-
self.__batch_prepare_assemble = False # Whether batching across prepare()/assemble() is configured
self.__batch_prepare_assemble_flags = 0 # Sandbox flags for batching across prepare()/assemble()
# Collect dir for batching across prepare()/assemble()
@@ -420,8 +413,7 @@ class Element(Plugin):
Yields:
The sources of this element
"""
- for source in self.__sources:
- yield source
+ return self.__sources.sources()
def dependencies(self, scope: Scope, *, recurse: bool = True, visited=None) -> Iterator["Element"]:
"""dependencies(scope, *, recurse=True)
@@ -914,7 +906,7 @@ class Element(Plugin):
redundant_ref = source._load_ref()
- element.__sources.append(source)
+ element.__sources.add_source(source)
# Collect redundant refs which occurred at load time
if redundant_ref is not None:
@@ -1076,7 +1068,7 @@ class Element(Plugin):
# (bool): Whether this element can currently be built
#
def _buildable(self):
- if not (self._has_all_sources_in_source_cache() or self._has_all_sources_cached()):
+ if self._fetch_needed():
return False
if not self.__assemble_scheduled:
@@ -1118,9 +1110,9 @@ class Element(Plugin):
# Compute up the elment's initial state. Element state contains
# the following mutable sub-states:
#
- # - Source state
+ # - Source state in `ElementSources`
# - Artifact cache key
- # - Source key
+ # - Source key in `ElementSources`
# - Integral component of the cache key
# - Computed as part of the source state
# - Artifact state
@@ -1137,8 +1129,6 @@ class Element(Plugin):
# notably jobs executed in sub-processes. Changes are performed by
# invocations of the following methods:
#
- # - __update_resolved_state()
- # - Computes the state of all sources of the element.
# - __update_cache_keys()
# - Computes the strong and weak cache keys.
# - _update_artifact_state()
@@ -1158,8 +1148,9 @@ class Element(Plugin):
# them, causing the state change to bubble through all potential
# side effects.
#
- # *This* method starts the process by invoking
- # `__update_resolved_state()`, which will cause all necessary state
+ # After initializing the source state via `ElementSources`,
+ # *this* method starts the process by invoking
+ # `__update_cache_keys()`, which will cause all necessary state
# changes. Other functions should use the appropriate methods and
# only update what they expect to change - this will ensure that
# the minimum amount of work is done.
@@ -1168,9 +1159,12 @@ class Element(Plugin):
assert not self._resolved_initial_state, "_initialize_state() should only be called once"
self._resolved_initial_state = True
- # This will update source state, and for un-initialized
+ # This will initialize source state.
+ self.__sources.update_resolved_state()
+
+ # This will calculate the cache keys, and for un-initialized
# elements recursively initialize anything else (because it
- # will become considered outdated after source state is
+ # will become considered outdated after cache keys are
# updated).
#
# FIXME: Currently this method may cause recursion through
@@ -1180,7 +1174,7 @@ class Element(Plugin):
# pull/build, however should not occur during initialization
# (since we will eventualyl visit reverse dependencies during
# our initialization anyway).
- self.__update_resolved_state()
+ self.__update_cache_keys()
# _get_display_key():
#
@@ -1229,7 +1223,8 @@ class Element(Plugin):
def _tracking_done(self):
# Tracking may change the sources' refs, and therefore the
# source state. We need to update source state.
- self.__update_resolved_state()
+ self.__sources.update_resolved_state()
+ self.__update_cache_keys()
# _track():
#
@@ -1242,21 +1237,7 @@ class Element(Plugin):
# (list): A list of Source object ids and their new references
#
def _track(self):
- refs = []
- for index, source in enumerate(self.__sources):
- old_ref = source.get_ref()
- new_ref = source._track(self.__sources[0:index])
- refs.append((source._unique_id, new_ref))
-
- # Complimentary warning that the new ref will be unused.
- if old_ref != new_ref and self._get_workspace():
- detail = (
- "This source has an open workspace.\n"
- + "To start using the new reference, please close the existing workspace."
- )
- source.warn("Updated reference will be ignored as source has open workspace", detail=detail)
-
- return refs
+ return self.__sources.track(self._get_workspace())
# _prepare_sandbox():
#
@@ -1338,39 +1319,22 @@ class Element(Plugin):
# No cached buildtree, stage source from source cache
else:
-
- # Assert sources are cached
- assert self._has_all_sources_in_source_cache()
-
- if self.__sources:
-
- sourcecache = context.sourcecache
- # find last required source
- last_required_previous_ix = self.__last_source_requires_previous()
- import_dir = CasBasedDirectory(context.get_cascache())
-
- try:
- for source in self.__sources[last_required_previous_ix:]:
- source_dir = sourcecache.export(source)
- import_dir.import_files(source_dir)
-
- except SourceCacheError as e:
- raise ElementError("Error trying to export source for {}: {}".format(self.name, e))
- except VirtualDirectoryError as e:
- raise ElementError(
- "Error trying to import sources together for {}: {}".format(self.name, e),
- reason="import-source-files-fail",
- )
-
- self.__sources_vdir = import_dir
-
- # incremental builds should merge the source into the last artifact before staging
- last_build_artifact = self.__get_last_build_artifact()
- if last_build_artifact:
- self.info("Incremental build")
- last_sources = last_build_artifact.get_sources()
- import_dir = last_build_artifact.get_buildtree()
- import_dir._apply_changes(last_sources, self.__sources_vdir)
+ try:
+ staged_sources = self.__sources.stage()
+ except (SourceCacheError, VirtualDirectoryError) as e:
+ raise ElementError(
+ "Error trying to stage sources for {}: {}".format(self.name, e), reason="stage-sources-fail"
+ )
+
+ # incremental builds should merge the source into the last artifact before staging
+ last_build_artifact = self.__get_last_build_artifact()
+ if last_build_artifact:
+ self.info("Incremental build")
+ last_sources = last_build_artifact.get_sources()
+ import_dir = last_build_artifact.get_buildtree()
+ import_dir._apply_changes(last_sources, staged_sources)
+ else:
+ import_dir = staged_sources
# Set update_mtime to ensure deterministic mtime of sources at build time
with utils._deterministic_umask():
@@ -1666,7 +1630,7 @@ class Element(Plugin):
# if the directory could not be found.
pass
- sourcesvdir = self.__sources_vdir
+ sourcesvdir = self.__sources.vdir
if collect is not None:
try:
@@ -1697,10 +1661,7 @@ class Element(Plugin):
# fetched_original (bool): Whether the original sources had been asked (and fetched) or not
#
def _fetch_done(self, fetched_original):
- self.__has_all_sources_in_source_cache = True
-
- for source in self.__sources:
- source._fetch_done(fetched_original)
+ self.__sources.fetch_done(fetched_original)
# _pull_pending()
#
@@ -1786,20 +1747,12 @@ class Element(Plugin):
return True
def _skip_source_push(self):
- if not self.__sources or self._get_workspace():
+ if not self.sources() or self._get_workspace():
return True
return not (self.__sourcecache.has_push_remotes(plugin=self) and self._has_all_sources_in_source_cache())
def _source_push(self):
- pushed = False
-
- # try and push sources if we've got them
- if self.__sourcecache.has_push_remotes(plugin=self) and self._has_all_sources_in_source_cache():
- for source in self.sources():
- if self.__sourcecache.push(source):
- pushed = True
-
- return pushed
+ return self.__sources.push()
# _skip_push():
#
@@ -1932,8 +1885,7 @@ class Element(Plugin):
#
os.makedirs(context.builddir, exist_ok=True)
with utils._tempdir(dir=context.builddir, prefix="workspace-{}".format(self.normal_name)) as temp:
- for source in self.sources():
- source._init_workspace(temp)
+ self.__sources.init_workspace(temp)
# Now hardlink the files into the workspace target.
utils.link_files(temp, workspace.get_absolute_path())
@@ -2039,29 +1991,7 @@ class Element(Plugin):
# SourceError: If one of the element sources has an error
#
def _fetch(self, fetch_original=False):
- previous_sources = []
- fetch_needed = False
-
- if self.__sources and not fetch_original:
- for source in self.__sources:
- if self.__sourcecache.contains(source):
- continue
-
- # try and fetch from source cache
- if not source._is_cached() and self.__sourcecache.has_fetch_remotes():
- if self.__sourcecache.pull(source):
- continue
-
- fetch_needed = True
-
- # We need to fetch original sources
- if fetch_needed or fetch_original:
- for source in self.sources():
- if not source._is_cached():
- source._fetch(previous_sources)
- previous_sources.append(source)
-
- self.__cache_sources()
+ self.__sources.fetch(fetch_original=fetch_original)
# _calculate_cache_key():
#
@@ -2094,12 +2024,7 @@ class Element(Plugin):
"public": self.__public.strip_node_info(),
}
- self.__cache_key_dict["sources"] = []
-
- for source in self.__sources:
- self.__cache_key_dict["sources"].append(
- {"key": source._get_unique_key(), "name": source._get_source_name()}
- )
+ self.__cache_key_dict["sources"] = self.__sources.get_unique_key()
self.__cache_key_dict["fatal-warnings"] = sorted(project._fatal_warnings)
@@ -2108,54 +2033,50 @@ class Element(Plugin):
return _cachekey.generate_key(cache_key_dict)
- # Check if sources are cached, generating the source key if it hasn't been
+ # _has_all_sources_in_source_cache()
+ #
+ # Get whether all sources of the element are cached in CAS
+ #
+ # Returns:
+ # (bool): True if the element sources are in CAS
+ #
def _has_all_sources_in_source_cache(self):
- if self.__has_all_sources_in_source_cache is not None:
- return self.__has_all_sources_in_source_cache
-
- if self.__sources:
- sourcecache = self._get_context().sourcecache
-
- # Go through sources we'll cache generating keys
- for ix, source in enumerate(self.__sources):
- if not source._key:
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- source._generate_key(self.__sources[:ix])
- else:
- source._generate_key([])
-
- # Check all sources are in source cache
- for source in self.__sources:
- if not sourcecache.contains(source):
- return False
-
- self.__has_all_sources_in_source_cache = True
- return True
+ return self.__sources.cached()
# _has_all_sources_resolved()
#
# Get whether all sources of the element are resolved
#
+ # Returns:
+ # (bool): True if all element sources are resolved
+ #
def _has_all_sources_resolved(self):
- return self.__is_resolved
+ return self.__sources.is_resolved()
- # _has_all_sources_cached()
+ # _fetch_needed():
+ #
+ # Return whether sources need to be fetched from a remote
#
- # Get whether all the sources of the element have their own cached
- # copy of their sources.
+ # Returns:
+ # (bool): True if one or more element sources need to be fetched
#
- def _has_all_sources_cached(self):
- return all(source._is_cached() for source in self.__sources)
+ def _fetch_needed(self):
+ return not self.__sources.cached() and not self.__sources.cached_original()
+ # _should_fetch():
+ #
+ # Return whether we need to run the fetch stage for this element
+ #
+ # Args:
+ # fetch_original (bool): whether we need the original unstaged source
+ #
+ # Returns:
+ # (bool): True if a fetch job is required
+ #
def _should_fetch(self, fetch_original=False):
- """ return bool of if we need to run the fetch stage for this element
-
- Args:
- fetch_original (bool): whether we need to original unstaged source
- """
if fetch_original:
- return not self._has_all_sources_cached()
- return not self._has_all_sources_in_source_cache()
+ return not self.__sources.cached_original()
+ return not self.__sources.cached()
# _set_required_callback()
#
@@ -2282,22 +2203,6 @@ class Element(Plugin):
# Private Local Methods #
#############################################################
- # __update_resolved_state()
- #
- # Updates source's resolved state
- #
- # An element's source state must be resolved before it may compute
- # cache keys, because the source's ref, whether defined in yaml or
- # from the workspace, is a component of the element's cache keys.
- #
- def __update_resolved_state(self):
- for source in self.__sources:
- if not source.is_resolved():
- break
- else:
- self.__is_resolved = True
- self.__update_cache_keys()
-
# __get_dependency_refs()
#
# Retrieve the artifact refs of the element's dependencies
@@ -2408,16 +2313,7 @@ class Element(Plugin):
# Prepend provenance to the error
raise ElementError("{}: {}".format(self, e), reason=e.reason, detail=e.detail) from e
- # Ensure that the first source does not need access to previous soruces
- if self.__sources and self.__sources[0]._requires_previous_sources():
- raise ElementError(
- "{}: {} cannot be the first source of an element "
- "as it requires access to previous sources".format(self, self.__sources[0])
- )
-
- # Preflight the sources
- for source in self.sources():
- source._preflight()
+ self.__sources.preflight()
# __assert_cached()
#
@@ -2892,38 +2788,6 @@ class Element(Plugin):
return True
- # __cache_sources():
- #
- # Caches the sources into the local CAS
- #
- def __cache_sources(self):
- if self.__sources and not self._has_all_sources_in_source_cache():
- last_requires_previous = 0
- # commit all other sources by themselves
- for ix, source in enumerate(self.__sources):
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- self.__sourcecache.commit(source, self.__sources[last_requires_previous:ix])
- last_requires_previous = ix
- else:
- self.__sourcecache.commit(source, [])
-
- # __last_source_requires_previous
- #
- # This is the last source that requires previous sources to be cached.
- # Sources listed after this will be cached separately.
- #
- # Returns:
- # (int): index of last source that requires previous sources
- #
- def __last_source_requires_previous(self):
- if self.__last_source_requires_previous_ix is None:
- last_requires_previous = 0
- for ix, source in enumerate(self.__sources):
- if source.BST_REQUIRES_PREVIOUS_SOURCES_STAGE:
- last_requires_previous = ix
- self.__last_source_requires_previous_ix = last_requires_previous
- return self.__last_source_requires_previous_ix
-
# __update_cache_keys()
#
# Updates weak and strict cache keys