summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChandan Singh <csingh43@bloomberg.net>2018-07-25 15:01:32 +0100
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2019-07-16 14:05:34 +0900
commitf8cf48c8a5819740f03b1aaf3f45b4ab1571bc15 (patch)
tree635df917a58411bffbbf806caa6bcf7254e58710
parentd337f089957f53926470e1723dfc1c4313a4f698 (diff)
downloadbuildstream-f8cf48c8a5819740f03b1aaf3f45b4ab1571bc15.tar.gz
Allow source plugins to access previous sources
Source plugin implementations can now specify that they need access to previously staged sources by specifying `BST_REQUIRES_PREVIOUS_SOURCES_TRACK` and/or `BST_REQUIRES_PREVIOUS_SOURCES_FETCH`, corresponding to access at `track` and `fetch` times respectively. Fixes #381. Replaces !505. For relevant discussion, see this discussion: https://gitlab.com/BuildStream/buildstream/merge_requests/505#note_83780747
-rw-r--r--buildstream/_loader/loader.py5
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py4
-rw-r--r--buildstream/element.py10
-rw-r--r--buildstream/source.py249
-rw-r--r--tests/sources/previous_source_access.py47
-rw-r--r--tests/sources/previous_source_access/elements/target.bst6
-rw-r--r--tests/sources/previous_source_access/files/file1
-rw-r--r--tests/sources/previous_source_access/plugins/sources/foo_transform.py87
-rw-r--r--tests/sources/previous_source_access/project.conf10
9 files changed, 335 insertions, 84 deletions
diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py
index 2efc4d360..71b74c506 100644
--- a/buildstream/_loader/loader.py
+++ b/buildstream/_loader/loader.py
@@ -540,11 +540,12 @@ class Loader():
#
if element._get_consistency() == Consistency.RESOLVED:
if fetch_subprojects:
- for source in element.sources():
+ sources = list(element.sources())
+ for idx, source in enumerate(sources):
if ticker:
ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind()))
if source._get_consistency() != Consistency.CACHED:
- source._fetch()
+ source._fetch(sources[0:idx])
else:
detail = "Try fetching the project with `bst fetch {}`".format(filename)
raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED,
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 423b620de..be43abe4e 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -41,8 +41,10 @@ class FetchQueue(Queue):
self._skip_cached = skip_cached
def process(self, element):
+ previous_sources = []
for source in element.sources():
- source._fetch()
+ source._fetch(previous_sources)
+ previous_sources.append(source)
def status(self, element):
if not element._is_required():
diff --git a/buildstream/element.py b/buildstream/element.py
index 3bdf601c1..2fc56d947 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1253,6 +1253,12 @@ class Element(Plugin):
# Prepend provenance to the error
raise ElementError("{}: {}".format(self, e), reason=e.reason) from e
+ # Ensure that the first source does not need access to previous soruces
+ if self.__sources and self.__sources[0]._requires_previous_sources():
+ raise ElementError("{}: {} cannot be the first source of an element "
+ "as it requires access to previous sources"
+ .format(self, self.__sources[0]))
+
# Preflight the sources
for source in self.sources():
source._preflight()
@@ -1296,9 +1302,9 @@ class Element(Plugin):
#
def _track(self):
refs = []
- for source in self.__sources:
+ for index, source in enumerate(self.__sources):
old_ref = source.get_ref()
- new_ref = source._track()
+ new_ref = source._track(self.__sources[0:index])
refs.append((source._unique_id, new_ref))
# Complimentary warning that the new ref will be unused.
diff --git a/buildstream/source.py b/buildstream/source.py
index f72aeae86..6b6d3126b 100644
--- a/buildstream/source.py
+++ b/buildstream/source.py
@@ -88,6 +88,39 @@ these methods are mandatory to implement.
:ref:`SourceFetcher <core_source_fetcher>`.
+Accessing previous sources
+--------------------------
+*Since: 1.4*
+
+In the general case, all sources are fetched and tracked independently of one
+another. In situations where a source needs to access previous source(s) in
+order to perform its own track and/or fetch, following attributes can be set to
+request access to previous sources:
+
+* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK`
+
+ Indicate that access to previous sources is required during track
+
+* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH`
+
+ Indicate that access to previous sources is required during fetch
+
+The intended use of such plugins is to fetch external dependencies of other
+sources, typically using some kind of package manager, such that all the
+dependencies of the original source(s) are available at build time.
+
+When implementing such a plugin, implementors should adhere to the following
+guidelines:
+
+* Implementations must be able to store the obtained artifacts in a
+ subdirectory.
+
+* Implementations must be able to deterministically generate a unique ref, such
+ that two refs are different if and only if they produce different outputs.
+
+* Implementations must not introduce host contamination.
+
+
.. _core_source_fetcher:
SourceFetcher - Object for fetching individual URLs
@@ -104,6 +137,8 @@ mentioned, these methods are mandatory to implement.
Fetches the URL associated with this SourceFetcher, optionally taking an
alias override.
+Class Reference
+---------------
"""
import os
@@ -175,7 +210,7 @@ class SourceFetcher():
#############################################################
# Abstract Methods #
#############################################################
- def fetch(self, alias_override=None):
+ def fetch(self, alias_override=None, **kwargs):
"""Fetch remote sources and mirror them locally, ensuring at least
that the specific reference is cached locally.
@@ -225,6 +260,32 @@ class Source(Plugin):
__defaults = {} # The defaults from the project
__defaults_set = False # Flag, in case there are not defaults at all
+ BST_REQUIRES_PREVIOUS_SOURCES_TRACK = False
+ """Whether access to previous sources is required during track
+
+ When set to True:
+ * all sources listed before this source in the given element will be
+ fetched before this source is tracked
+ * Source.track() will be called with an additional keyword argument
+ `previous_sources_dir` where previous sources will be staged
+ * this source can not be the first source for an element
+
+ *Since: 1.4*
+ """
+
+ BST_REQUIRES_PREVIOUS_SOURCES_FETCH = False
+ """Whether access to previous sources is required during fetch
+
+ When set to True:
+ * all sources listed before this source in the given element will be
+ fetched before this source is fetched
+ * Source.fetch() will be called with an additional keyword argument
+ `previous_sources_dir` where previous sources will be staged
+ * this source can not be the first source for an element
+
+ *Since: 1.4*
+ """
+
def __init__(self, context, project, meta, *, alias_override=None, unique_id=None):
provenance = _yaml.node_get_provenance(meta.config)
super().__init__("{}-{}".format(meta.element_name, meta.element_index),
@@ -324,9 +385,15 @@ class Source(Plugin):
"""
raise ImplError("Source plugin '{}' does not implement set_ref()".format(self.get_kind()))
- def track(self):
+ def track(self, **kwargs):
"""Resolve a new ref from the plugin's track option
+ Args:
+ previous_sources_dir (str): directory where previous sources are staged.
+ Note that this keyword argument is available only when
+ :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK`
+ is set to True.
+
Returns:
(simple object): A new internal source reference, or None
@@ -345,10 +412,16 @@ class Source(Plugin):
# Allow a non implementation
return None
- def fetch(self):
+ def fetch(self, **kwargs):
"""Fetch remote sources and mirror them locally, ensuring at least
that the specific reference is cached locally.
+ Args:
+ previous_sources_dir (str): directory where previous sources are staged.
+ Note that this keyword argument is available only when
+ :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH`
+ is set to True.
+
Raises:
:class:`.SourceError`
@@ -583,78 +656,19 @@ class Source(Plugin):
# Wrapper function around plugin provided fetch method
#
- def _fetch(self):
- project = self._get_project()
- context = self._get_context()
-
- # Silence the STATUS messages which might happen as a result
- # of checking the source fetchers.
- with context.silence():
- source_fetchers = self.get_source_fetchers()
-
- # Use the source fetchers if they are provided
- #
- if source_fetchers:
-
- # Use a contorted loop here, this is to allow us to
- # silence the messages which can result from consuming
- # the items of source_fetchers, if it happens to be a generator.
- #
- source_fetchers = iter(source_fetchers)
- try:
-
- while True:
-
- with context.silence():
- fetcher = next(source_fetchers)
-
- alias = fetcher._get_alias()
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
- try:
- fetcher.fetch(uri)
- # FIXME: Need to consider temporary vs. permanent failures,
- # and how this works with retries.
- except BstError as e:
- last_error = e
- continue
-
- # No error, we're done with this fetcher
- break
-
- else:
- # No break occurred, raise the last detected error
- raise last_error
-
- except StopIteration:
- pass
-
- # Default codepath is to reinstantiate the Source
- #
+ # Args:
+ # previous_sources (list): List of Sources listed prior to this source
+ #
+ def _fetch(self, previous_sources):
+
+ if self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH:
+ self.__ensure_previous_sources(previous_sources)
+ with self.tempdir() as staging_directory:
+ for src in previous_sources:
+ src._stage(staging_directory)
+ self.__do_fetch(previous_sources_dir=self.__ensure_directory(staging_directory))
else:
- alias = self._get_alias()
- if self.__first_pass:
- mirrors = project.first_pass_config.mirrors
- else:
- mirrors = project.config.mirrors
- if not mirrors or not alias:
- self.fetch()
- return
-
- for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
- new_source = self.__clone_for_uri(uri)
- try:
- new_source.fetch()
- # FIXME: Need to consider temporary vs. permanent failures,
- # and how this works with retries.
- except BstError as e:
- last_error = e
- continue
-
- # No error, we're done here
- return
-
- # Re raise the last detected error
- raise last_error
+ self.__do_fetch()
# Wrapper for stage() api which gives the source
# plugin a fully constructed path considering the
@@ -866,8 +880,19 @@ class Source(Plugin):
# Wrapper for track()
#
- def _track(self):
- new_ref = self.__do_track()
+ # Args:
+ # previous_sources (list): List of Sources listed prior to this source
+ #
+ def _track(self, previous_sources):
+ if self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK:
+ self.__ensure_previous_sources(previous_sources)
+ with self.tempdir() as staging_directory:
+ for src in previous_sources:
+ src._stage(staging_directory)
+ new_ref = self.__do_track(previous_sources_dir=self.__ensure_directory(staging_directory))
+ else:
+ new_ref = self.__do_track()
+
current_ref = self.get_ref()
if new_ref is None:
@@ -879,6 +904,17 @@ class Source(Plugin):
return new_ref
+ # _requires_previous_sources()
+ #
+ # If a plugin requires access to previous sources at track or fetch time,
+ # then it cannot be the first source of an elemenet.
+ #
+ # Returns:
+ # (bool): Whether this source requires access to previous sources
+ #
+ def _requires_previous_sources(self):
+ return self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK or self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH
+
# Returns the alias if it's defined in the project
def _get_alias(self):
alias = self.__expected_alias
@@ -928,8 +964,52 @@ class Source(Plugin):
return clone
+ # Tries to call fetch for every mirror, stopping once it succeeds
+ def __do_fetch(self, **kwargs):
+ project = self._get_project()
+ source_fetchers = self.get_source_fetchers()
+ if source_fetchers:
+ for fetcher in source_fetchers:
+ alias = fetcher._get_alias()
+ success = False
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
+ try:
+ fetcher.fetch(uri)
+ # FIXME: Need to consider temporary vs. permanent failures,
+ # and how this works with retries.
+ except BstError as e:
+ last_error = e
+ continue
+ success = True
+ break
+ if not success:
+ raise last_error
+ else:
+ alias = self._get_alias()
+ if self.__first_pass:
+ mirrors = project.first_pass_config.mirrors
+ else:
+ mirrors = project.config.mirrors
+ if not mirrors or not alias:
+ self.fetch(**kwargs)
+ return
+
+ context = self._get_context()
+ source_kind = type(self)
+ for uri in project.get_alias_uris(alias, first_pass=self.__first_pass):
+ new_source = self.__clone_for_uri(uri)
+ try:
+ new_source.fetch(**kwargs)
+ # FIXME: Need to consider temporary vs. permanent failures,
+ # and how this works with retries.
+ except BstError as e:
+ last_error = e
+ continue
+ return
+ raise last_error
+
# Tries to call track for every mirror, stopping once it succeeds
- def __do_track(self):
+ def __do_track(self, **kwargs):
project = self._get_project()
alias = self._get_alias()
if self.__first_pass:
@@ -938,14 +1018,14 @@ class Source(Plugin):
mirrors = project.config.mirrors
# If there are no mirrors, or no aliases to replace, there's nothing to do here.
if not mirrors or not alias:
- return self.track()
+ return self.track(**kwargs)
# NOTE: We are assuming here that tracking only requires substituting the
# first alias used
for uri in reversed(project.get_alias_uris(alias, first_pass=self.__first_pass)):
new_source = self.__clone_for_uri(uri)
try:
- ref = new_source.track()
+ ref = new_source.track(**kwargs)
# FIXME: Need to consider temporary vs. permanent failures,
# and how this works with retries.
except BstError as e:
@@ -990,6 +1070,17 @@ class Source(Plugin):
return config
+ # Ensures that previous sources have been tracked and fetched.
+ #
+ def __ensure_previous_sources(self, previous_sources):
+ for index, src in enumerate(previous_sources):
+ if src.get_consistency() == Consistency.RESOLVED:
+ src._fetch(previous_sources[0:index])
+ elif src.get_consistency() == Consistency.INCONSISTENT:
+ new_ref = src._track(previous_sources[0:index])
+ src._save_ref(new_ref)
+ src._fetch(previous_sources[0:index])
+
def _extract_alias(url):
parts = url.split(utils._ALIAS_SEPARATOR, 1)
diff --git a/tests/sources/previous_source_access.py b/tests/sources/previous_source_access.py
new file mode 100644
index 000000000..f7045383c
--- /dev/null
+++ b/tests/sources/previous_source_access.py
@@ -0,0 +1,47 @@
+import os
+import pytest
+
+from tests.testutils import cli
+
+DATA_DIR = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ 'previous_source_access'
+)
+
+
+##################################################################
+# Tests #
+##################################################################
+# Test that plugins can access data from previous sources
+@pytest.mark.datafiles(DATA_DIR)
+def test_custom_transform_source(cli, tmpdir, datafiles):
+ project = os.path.join(datafiles.dirname, datafiles.basename)
+
+ # Ensure we can track
+ result = cli.run(project=project, args=[
+ 'track', 'target.bst'
+ ])
+ result.assert_success()
+
+ # Ensure we can fetch
+ result = cli.run(project=project, args=[
+ 'fetch', 'target.bst'
+ ])
+ result.assert_success()
+
+ # Ensure we get correct output from foo_transform
+ result = cli.run(project=project, args=[
+ 'build', 'target.bst'
+ ])
+ destpath = os.path.join(cli.directory, 'checkout')
+ result = cli.run(project=project, args=[
+ 'checkout', 'target.bst', destpath
+ ])
+ result.assert_success()
+ # Assert that files from both sources exist, and that they have
+ # the same content
+ assert os.path.exists(os.path.join(destpath, 'file'))
+ assert os.path.exists(os.path.join(destpath, 'filetransform'))
+ with open(os.path.join(destpath, 'file')) as file1:
+ with open(os.path.join(destpath, 'filetransform')) as file2:
+ assert file1.read() == file2.read()
diff --git a/tests/sources/previous_source_access/elements/target.bst b/tests/sources/previous_source_access/elements/target.bst
new file mode 100644
index 000000000..c9d3b9bb6
--- /dev/null
+++ b/tests/sources/previous_source_access/elements/target.bst
@@ -0,0 +1,6 @@
+kind: import
+
+sources:
+- kind: local
+ path: files/file
+- kind: foo_transform
diff --git a/tests/sources/previous_source_access/files/file b/tests/sources/previous_source_access/files/file
new file mode 100644
index 000000000..980a0d5f1
--- /dev/null
+++ b/tests/sources/previous_source_access/files/file
@@ -0,0 +1 @@
+Hello World!
diff --git a/tests/sources/previous_source_access/plugins/sources/foo_transform.py b/tests/sources/previous_source_access/plugins/sources/foo_transform.py
new file mode 100644
index 000000000..7101bfd24
--- /dev/null
+++ b/tests/sources/previous_source_access/plugins/sources/foo_transform.py
@@ -0,0 +1,87 @@
+"""
+foo_transform - transform "file" from previous sources into "filetransform"
+===========================================================================
+
+This is a test source plugin that looks for a file named "file" staged by
+previous sources, and copies its contents to a file called "filetransform".
+
+"""
+
+import os
+import hashlib
+
+from buildstream import Consistency, Source, SourceError, utils
+
+
+class FooTransformSource(Source):
+
+ # We need access to previous both at track time and fetch time
+ BST_REQUIRES_PREVIOUS_SOURCES_TRACK = True
+ BST_REQUIRES_PREVIOUS_SOURCES_FETCH = True
+
+ @property
+ def mirror(self):
+ """Directory where this source should stage its files
+
+ """
+ path = os.path.join(self.get_mirror_directory(), self.name,
+ self.ref.strip())
+ os.makedirs(path, exist_ok=True)
+ return path
+
+ def configure(self, node):
+ self.node_validate(node, ['ref'] + Source.COMMON_CONFIG_KEYS)
+ self.ref = self.node_get_member(node, str, 'ref', None)
+
+ def preflight(self):
+ pass
+
+ def get_unique_key(self):
+ return (self.ref,)
+
+ def get_consistency(self):
+ if self.ref is None:
+ return Consistency.INCONSISTENT
+ # If we have a file called "filetransform", verify that its checksum
+ # matches our ref. Otherwise, it resolved but not cached.
+ fpath = os.path.join(self.mirror, 'filetransform')
+ try:
+ with open(fpath, 'rb') as f:
+ if hashlib.sha256(f.read()).hexdigest() == self.ref.strip():
+ return Consistency.CACHED
+ except Exception:
+ pass
+ return Consistency.RESOLVED
+
+ def get_ref(self):
+ return self.ref
+
+ def set_ref(self, ref, node):
+ self.ref = node['ref'] = ref
+
+ def track(self, previous_sources_dir):
+ # Store the checksum of the file from previous source as our ref
+ fpath = os.path.join(previous_sources_dir, 'file')
+ with open(fpath, 'rb') as f:
+ return hashlib.sha256(f.read()).hexdigest()
+
+ def fetch(self, previous_sources_dir):
+ fpath = os.path.join(previous_sources_dir, 'file')
+ # Verify that the checksum of the file from previous source matches
+ # our ref
+ with open(fpath, 'rb') as f:
+ if hashlib.sha256(f.read()).hexdigest() != self.ref.strip():
+ raise SourceError("Element references do not match")
+
+ # Copy "file" as "filetransform"
+ newfpath = os.path.join(self.mirror, 'filetransform')
+ utils.safe_copy(fpath, newfpath)
+
+ def stage(self, directory):
+ # Simply stage the "filetransform" file
+ utils.safe_copy(os.path.join(self.mirror, 'filetransform'),
+ os.path.join(directory, 'filetransform'))
+
+
+def setup():
+ return FooTransformSource
diff --git a/tests/sources/previous_source_access/project.conf b/tests/sources/previous_source_access/project.conf
new file mode 100644
index 000000000..1749b3dba
--- /dev/null
+++ b/tests/sources/previous_source_access/project.conf
@@ -0,0 +1,10 @@
+# Project with local source plugins
+name: foo
+
+element-path: elements
+
+plugins:
+- origin: local
+ path: plugins/sources
+ sources:
+ foo_transform: 0