diff options
author | Chandan Singh <csingh43@bloomberg.net> | 2018-07-25 15:01:32 +0100 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2019-07-16 14:05:34 +0900 |
commit | f8cf48c8a5819740f03b1aaf3f45b4ab1571bc15 (patch) | |
tree | 635df917a58411bffbbf806caa6bcf7254e58710 | |
parent | d337f089957f53926470e1723dfc1c4313a4f698 (diff) | |
download | buildstream-f8cf48c8a5819740f03b1aaf3f45b4ab1571bc15.tar.gz |
Allow source plugins to access previous sources
Source plugin implementations can now specify that they need access to
previously staged sources by specifying
`BST_REQUIRES_PREVIOUS_SOURCES_TRACK` and/or
`BST_REQUIRES_PREVIOUS_SOURCES_FETCH`, corresponding to access at `track`
and `fetch` times respectively.
Fixes #381.
Replaces !505. For relevant discussion, see this discussion:
https://gitlab.com/BuildStream/buildstream/merge_requests/505#note_83780747
-rw-r--r-- | buildstream/_loader/loader.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/element.py | 10 | ||||
-rw-r--r-- | buildstream/source.py | 249 | ||||
-rw-r--r-- | tests/sources/previous_source_access.py | 47 | ||||
-rw-r--r-- | tests/sources/previous_source_access/elements/target.bst | 6 | ||||
-rw-r--r-- | tests/sources/previous_source_access/files/file | 1 | ||||
-rw-r--r-- | tests/sources/previous_source_access/plugins/sources/foo_transform.py | 87 | ||||
-rw-r--r-- | tests/sources/previous_source_access/project.conf | 10 |
9 files changed, 335 insertions, 84 deletions
diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index 2efc4d360..71b74c506 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -540,11 +540,12 @@ class Loader(): # if element._get_consistency() == Consistency.RESOLVED: if fetch_subprojects: - for source in element.sources(): + sources = list(element.sources()) + for idx, source in enumerate(sources): if ticker: ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind())) if source._get_consistency() != Consistency.CACHED: - source._fetch() + source._fetch(sources[0:idx]) else: detail = "Try fetching the project with `bst fetch {}`".format(filename) raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED, diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 423b620de..be43abe4e 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -41,8 +41,10 @@ class FetchQueue(Queue): self._skip_cached = skip_cached def process(self, element): + previous_sources = [] for source in element.sources(): - source._fetch() + source._fetch(previous_sources) + previous_sources.append(source) def status(self, element): if not element._is_required(): diff --git a/buildstream/element.py b/buildstream/element.py index 3bdf601c1..2fc56d947 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1253,6 +1253,12 @@ class Element(Plugin): # Prepend provenance to the error raise ElementError("{}: {}".format(self, e), reason=e.reason) from e + # Ensure that the first source does not need access to previous soruces + if self.__sources and self.__sources[0]._requires_previous_sources(): + raise ElementError("{}: {} cannot be the first source of an element " + "as it requires access to previous sources" + .format(self, self.__sources[0])) + # Preflight the sources for source in self.sources(): source._preflight() @@ -1296,9 +1302,9 @@ class Element(Plugin): # def _track(self): refs = [] - for source in self.__sources: + for index, source in enumerate(self.__sources): old_ref = source.get_ref() - new_ref = source._track() + new_ref = source._track(self.__sources[0:index]) refs.append((source._unique_id, new_ref)) # Complimentary warning that the new ref will be unused. diff --git a/buildstream/source.py b/buildstream/source.py index f72aeae86..6b6d3126b 100644 --- a/buildstream/source.py +++ b/buildstream/source.py @@ -88,6 +88,39 @@ these methods are mandatory to implement. :ref:`SourceFetcher <core_source_fetcher>`. +Accessing previous sources +-------------------------- +*Since: 1.4* + +In the general case, all sources are fetched and tracked independently of one +another. In situations where a source needs to access previous source(s) in +order to perform its own track and/or fetch, following attributes can be set to +request access to previous sources: + +* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK` + + Indicate that access to previous sources is required during track + +* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH` + + Indicate that access to previous sources is required during fetch + +The intended use of such plugins is to fetch external dependencies of other +sources, typically using some kind of package manager, such that all the +dependencies of the original source(s) are available at build time. + +When implementing such a plugin, implementors should adhere to the following +guidelines: + +* Implementations must be able to store the obtained artifacts in a + subdirectory. + +* Implementations must be able to deterministically generate a unique ref, such + that two refs are different if and only if they produce different outputs. + +* Implementations must not introduce host contamination. + + .. _core_source_fetcher: SourceFetcher - Object for fetching individual URLs @@ -104,6 +137,8 @@ mentioned, these methods are mandatory to implement. Fetches the URL associated with this SourceFetcher, optionally taking an alias override. +Class Reference +--------------- """ import os @@ -175,7 +210,7 @@ class SourceFetcher(): ############################################################# # Abstract Methods # ############################################################# - def fetch(self, alias_override=None): + def fetch(self, alias_override=None, **kwargs): """Fetch remote sources and mirror them locally, ensuring at least that the specific reference is cached locally. @@ -225,6 +260,32 @@ class Source(Plugin): __defaults = {} # The defaults from the project __defaults_set = False # Flag, in case there are not defaults at all + BST_REQUIRES_PREVIOUS_SOURCES_TRACK = False + """Whether access to previous sources is required during track + + When set to True: + * all sources listed before this source in the given element will be + fetched before this source is tracked + * Source.track() will be called with an additional keyword argument + `previous_sources_dir` where previous sources will be staged + * this source can not be the first source for an element + + *Since: 1.4* + """ + + BST_REQUIRES_PREVIOUS_SOURCES_FETCH = False + """Whether access to previous sources is required during fetch + + When set to True: + * all sources listed before this source in the given element will be + fetched before this source is fetched + * Source.fetch() will be called with an additional keyword argument + `previous_sources_dir` where previous sources will be staged + * this source can not be the first source for an element + + *Since: 1.4* + """ + def __init__(self, context, project, meta, *, alias_override=None, unique_id=None): provenance = _yaml.node_get_provenance(meta.config) super().__init__("{}-{}".format(meta.element_name, meta.element_index), @@ -324,9 +385,15 @@ class Source(Plugin): """ raise ImplError("Source plugin '{}' does not implement set_ref()".format(self.get_kind())) - def track(self): + def track(self, **kwargs): """Resolve a new ref from the plugin's track option + Args: + previous_sources_dir (str): directory where previous sources are staged. + Note that this keyword argument is available only when + :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK` + is set to True. + Returns: (simple object): A new internal source reference, or None @@ -345,10 +412,16 @@ class Source(Plugin): # Allow a non implementation return None - def fetch(self): + def fetch(self, **kwargs): """Fetch remote sources and mirror them locally, ensuring at least that the specific reference is cached locally. + Args: + previous_sources_dir (str): directory where previous sources are staged. + Note that this keyword argument is available only when + :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH` + is set to True. + Raises: :class:`.SourceError` @@ -583,78 +656,19 @@ class Source(Plugin): # Wrapper function around plugin provided fetch method # - def _fetch(self): - project = self._get_project() - context = self._get_context() - - # Silence the STATUS messages which might happen as a result - # of checking the source fetchers. - with context.silence(): - source_fetchers = self.get_source_fetchers() - - # Use the source fetchers if they are provided - # - if source_fetchers: - - # Use a contorted loop here, this is to allow us to - # silence the messages which can result from consuming - # the items of source_fetchers, if it happens to be a generator. - # - source_fetchers = iter(source_fetchers) - try: - - while True: - - with context.silence(): - fetcher = next(source_fetchers) - - alias = fetcher._get_alias() - for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): - try: - fetcher.fetch(uri) - # FIXME: Need to consider temporary vs. permanent failures, - # and how this works with retries. - except BstError as e: - last_error = e - continue - - # No error, we're done with this fetcher - break - - else: - # No break occurred, raise the last detected error - raise last_error - - except StopIteration: - pass - - # Default codepath is to reinstantiate the Source - # + # Args: + # previous_sources (list): List of Sources listed prior to this source + # + def _fetch(self, previous_sources): + + if self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH: + self.__ensure_previous_sources(previous_sources) + with self.tempdir() as staging_directory: + for src in previous_sources: + src._stage(staging_directory) + self.__do_fetch(previous_sources_dir=self.__ensure_directory(staging_directory)) else: - alias = self._get_alias() - if self.__first_pass: - mirrors = project.first_pass_config.mirrors - else: - mirrors = project.config.mirrors - if not mirrors or not alias: - self.fetch() - return - - for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): - new_source = self.__clone_for_uri(uri) - try: - new_source.fetch() - # FIXME: Need to consider temporary vs. permanent failures, - # and how this works with retries. - except BstError as e: - last_error = e - continue - - # No error, we're done here - return - - # Re raise the last detected error - raise last_error + self.__do_fetch() # Wrapper for stage() api which gives the source # plugin a fully constructed path considering the @@ -866,8 +880,19 @@ class Source(Plugin): # Wrapper for track() # - def _track(self): - new_ref = self.__do_track() + # Args: + # previous_sources (list): List of Sources listed prior to this source + # + def _track(self, previous_sources): + if self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK: + self.__ensure_previous_sources(previous_sources) + with self.tempdir() as staging_directory: + for src in previous_sources: + src._stage(staging_directory) + new_ref = self.__do_track(previous_sources_dir=self.__ensure_directory(staging_directory)) + else: + new_ref = self.__do_track() + current_ref = self.get_ref() if new_ref is None: @@ -879,6 +904,17 @@ class Source(Plugin): return new_ref + # _requires_previous_sources() + # + # If a plugin requires access to previous sources at track or fetch time, + # then it cannot be the first source of an elemenet. + # + # Returns: + # (bool): Whether this source requires access to previous sources + # + def _requires_previous_sources(self): + return self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK or self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH + # Returns the alias if it's defined in the project def _get_alias(self): alias = self.__expected_alias @@ -928,8 +964,52 @@ class Source(Plugin): return clone + # Tries to call fetch for every mirror, stopping once it succeeds + def __do_fetch(self, **kwargs): + project = self._get_project() + source_fetchers = self.get_source_fetchers() + if source_fetchers: + for fetcher in source_fetchers: + alias = fetcher._get_alias() + success = False + for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): + try: + fetcher.fetch(uri) + # FIXME: Need to consider temporary vs. permanent failures, + # and how this works with retries. + except BstError as e: + last_error = e + continue + success = True + break + if not success: + raise last_error + else: + alias = self._get_alias() + if self.__first_pass: + mirrors = project.first_pass_config.mirrors + else: + mirrors = project.config.mirrors + if not mirrors or not alias: + self.fetch(**kwargs) + return + + context = self._get_context() + source_kind = type(self) + for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): + new_source = self.__clone_for_uri(uri) + try: + new_source.fetch(**kwargs) + # FIXME: Need to consider temporary vs. permanent failures, + # and how this works with retries. + except BstError as e: + last_error = e + continue + return + raise last_error + # Tries to call track for every mirror, stopping once it succeeds - def __do_track(self): + def __do_track(self, **kwargs): project = self._get_project() alias = self._get_alias() if self.__first_pass: @@ -938,14 +1018,14 @@ class Source(Plugin): mirrors = project.config.mirrors # If there are no mirrors, or no aliases to replace, there's nothing to do here. if not mirrors or not alias: - return self.track() + return self.track(**kwargs) # NOTE: We are assuming here that tracking only requires substituting the # first alias used for uri in reversed(project.get_alias_uris(alias, first_pass=self.__first_pass)): new_source = self.__clone_for_uri(uri) try: - ref = new_source.track() + ref = new_source.track(**kwargs) # FIXME: Need to consider temporary vs. permanent failures, # and how this works with retries. except BstError as e: @@ -990,6 +1070,17 @@ class Source(Plugin): return config + # Ensures that previous sources have been tracked and fetched. + # + def __ensure_previous_sources(self, previous_sources): + for index, src in enumerate(previous_sources): + if src.get_consistency() == Consistency.RESOLVED: + src._fetch(previous_sources[0:index]) + elif src.get_consistency() == Consistency.INCONSISTENT: + new_ref = src._track(previous_sources[0:index]) + src._save_ref(new_ref) + src._fetch(previous_sources[0:index]) + def _extract_alias(url): parts = url.split(utils._ALIAS_SEPARATOR, 1) diff --git a/tests/sources/previous_source_access.py b/tests/sources/previous_source_access.py new file mode 100644 index 000000000..f7045383c --- /dev/null +++ b/tests/sources/previous_source_access.py @@ -0,0 +1,47 @@ +import os +import pytest + +from tests.testutils import cli + +DATA_DIR = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + 'previous_source_access' +) + + +################################################################## +# Tests # +################################################################## +# Test that plugins can access data from previous sources +@pytest.mark.datafiles(DATA_DIR) +def test_custom_transform_source(cli, tmpdir, datafiles): + project = os.path.join(datafiles.dirname, datafiles.basename) + + # Ensure we can track + result = cli.run(project=project, args=[ + 'track', 'target.bst' + ]) + result.assert_success() + + # Ensure we can fetch + result = cli.run(project=project, args=[ + 'fetch', 'target.bst' + ]) + result.assert_success() + + # Ensure we get correct output from foo_transform + result = cli.run(project=project, args=[ + 'build', 'target.bst' + ]) + destpath = os.path.join(cli.directory, 'checkout') + result = cli.run(project=project, args=[ + 'checkout', 'target.bst', destpath + ]) + result.assert_success() + # Assert that files from both sources exist, and that they have + # the same content + assert os.path.exists(os.path.join(destpath, 'file')) + assert os.path.exists(os.path.join(destpath, 'filetransform')) + with open(os.path.join(destpath, 'file')) as file1: + with open(os.path.join(destpath, 'filetransform')) as file2: + assert file1.read() == file2.read() diff --git a/tests/sources/previous_source_access/elements/target.bst b/tests/sources/previous_source_access/elements/target.bst new file mode 100644 index 000000000..c9d3b9bb6 --- /dev/null +++ b/tests/sources/previous_source_access/elements/target.bst @@ -0,0 +1,6 @@ +kind: import + +sources: +- kind: local + path: files/file +- kind: foo_transform diff --git a/tests/sources/previous_source_access/files/file b/tests/sources/previous_source_access/files/file new file mode 100644 index 000000000..980a0d5f1 --- /dev/null +++ b/tests/sources/previous_source_access/files/file @@ -0,0 +1 @@ +Hello World! diff --git a/tests/sources/previous_source_access/plugins/sources/foo_transform.py b/tests/sources/previous_source_access/plugins/sources/foo_transform.py new file mode 100644 index 000000000..7101bfd24 --- /dev/null +++ b/tests/sources/previous_source_access/plugins/sources/foo_transform.py @@ -0,0 +1,87 @@ +""" +foo_transform - transform "file" from previous sources into "filetransform" +=========================================================================== + +This is a test source plugin that looks for a file named "file" staged by +previous sources, and copies its contents to a file called "filetransform". + +""" + +import os +import hashlib + +from buildstream import Consistency, Source, SourceError, utils + + +class FooTransformSource(Source): + + # We need access to previous both at track time and fetch time + BST_REQUIRES_PREVIOUS_SOURCES_TRACK = True + BST_REQUIRES_PREVIOUS_SOURCES_FETCH = True + + @property + def mirror(self): + """Directory where this source should stage its files + + """ + path = os.path.join(self.get_mirror_directory(), self.name, + self.ref.strip()) + os.makedirs(path, exist_ok=True) + return path + + def configure(self, node): + self.node_validate(node, ['ref'] + Source.COMMON_CONFIG_KEYS) + self.ref = self.node_get_member(node, str, 'ref', None) + + def preflight(self): + pass + + def get_unique_key(self): + return (self.ref,) + + def get_consistency(self): + if self.ref is None: + return Consistency.INCONSISTENT + # If we have a file called "filetransform", verify that its checksum + # matches our ref. Otherwise, it resolved but not cached. + fpath = os.path.join(self.mirror, 'filetransform') + try: + with open(fpath, 'rb') as f: + if hashlib.sha256(f.read()).hexdigest() == self.ref.strip(): + return Consistency.CACHED + except Exception: + pass + return Consistency.RESOLVED + + def get_ref(self): + return self.ref + + def set_ref(self, ref, node): + self.ref = node['ref'] = ref + + def track(self, previous_sources_dir): + # Store the checksum of the file from previous source as our ref + fpath = os.path.join(previous_sources_dir, 'file') + with open(fpath, 'rb') as f: + return hashlib.sha256(f.read()).hexdigest() + + def fetch(self, previous_sources_dir): + fpath = os.path.join(previous_sources_dir, 'file') + # Verify that the checksum of the file from previous source matches + # our ref + with open(fpath, 'rb') as f: + if hashlib.sha256(f.read()).hexdigest() != self.ref.strip(): + raise SourceError("Element references do not match") + + # Copy "file" as "filetransform" + newfpath = os.path.join(self.mirror, 'filetransform') + utils.safe_copy(fpath, newfpath) + + def stage(self, directory): + # Simply stage the "filetransform" file + utils.safe_copy(os.path.join(self.mirror, 'filetransform'), + os.path.join(directory, 'filetransform')) + + +def setup(): + return FooTransformSource diff --git a/tests/sources/previous_source_access/project.conf b/tests/sources/previous_source_access/project.conf new file mode 100644 index 000000000..1749b3dba --- /dev/null +++ b/tests/sources/previous_source_access/project.conf @@ -0,0 +1,10 @@ +# Project with local source plugins +name: foo + +element-path: elements + +plugins: +- origin: local + path: plugins/sources + sources: + foo_transform: 0 |