diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-07-16 06:29:48 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-07-16 06:29:48 +0000 |
commit | d917648308d9b5a5f8b916f62f0d769a2de40fe0 (patch) | |
tree | 89341909d11e142a7a80fa6b3df1aea4a15715ce /buildstream | |
parent | 34d9f5953a3ad2021379b24d33175c90b90b5154 (diff) | |
parent | da8e635a84238743d949a111ef9ed5e07de0db62 (diff) | |
download | buildstream-d917648308d9b5a5f8b916f62f0d769a2de40fe0.tar.gz |
Merge branch 'tristan/backport-previous-sources' into 'bst-1'
Backport ability to see previous sources in fetch() and track()
See merge request BuildStream/buildstream!1477
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_loader/loader.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_versions.py | 2 | ||||
-rw-r--r-- | buildstream/element.py | 10 | ||||
-rw-r--r-- | buildstream/plugins/sources/pip.py | 237 | ||||
-rw-r--r-- | buildstream/source.py | 287 |
7 files changed, 442 insertions, 108 deletions
diff --git a/buildstream/_loader/loader.py b/buildstream/_loader/loader.py index 2efc4d360..71b74c506 100644 --- a/buildstream/_loader/loader.py +++ b/buildstream/_loader/loader.py @@ -540,11 +540,12 @@ class Loader(): # if element._get_consistency() == Consistency.RESOLVED: if fetch_subprojects: - for source in element.sources(): + sources = list(element.sources()) + for idx, source in enumerate(sources): if ticker: ticker(filename, 'Fetching subproject from {} source'.format(source.get_kind())) if source._get_consistency() != Consistency.CACHED: - source._fetch() + source._fetch(sources[0:idx]) else: detail = "Try fetching the project with `bst fetch {}`".format(filename) raise LoadError(LoadErrorReason.SUBPROJECT_FETCH_NEEDED, diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 423b620de..be43abe4e 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -41,8 +41,10 @@ class FetchQueue(Queue): self._skip_cached = skip_cached def process(self, element): + previous_sources = [] for source in element.sources(): - source._fetch() + source._fetch(previous_sources) + previous_sources.append(source) def status(self, element): if not element._is_required(): diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index d7e6546f3..72a79a532 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -53,9 +53,10 @@ class TrackQueue(Queue): if status == JobStatus.FAIL: return - # Set the new refs in the main process one by one as they complete + # Set the new refs in the main process one by one as they complete, + # writing to bst files this time for unique_id, new_ref in result: source = Plugin._lookup(unique_id) - source._save_ref(new_ref) + source._set_ref(new_ref, save=True) element._tracking_done() diff --git a/buildstream/_versions.py b/buildstream/_versions.py index eddb34fc6..bbb43000e 100644 --- a/buildstream/_versions.py +++ b/buildstream/_versions.py @@ -23,7 +23,7 @@ # This version is bumped whenever enhancements are made # to the `project.conf` format or the core element format. # -BST_FORMAT_VERSION = 12 +BST_FORMAT_VERSION = 13 # The base BuildStream artifact version diff --git a/buildstream/element.py b/buildstream/element.py index 3bdf601c1..2fc56d947 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1253,6 +1253,12 @@ class Element(Plugin): # Prepend provenance to the error raise ElementError("{}: {}".format(self, e), reason=e.reason) from e + # Ensure that the first source does not need access to previous soruces + if self.__sources and self.__sources[0]._requires_previous_sources(): + raise ElementError("{}: {} cannot be the first source of an element " + "as it requires access to previous sources" + .format(self, self.__sources[0])) + # Preflight the sources for source in self.sources(): source._preflight() @@ -1296,9 +1302,9 @@ class Element(Plugin): # def _track(self): refs = [] - for source in self.__sources: + for index, source in enumerate(self.__sources): old_ref = source.get_ref() - new_ref = source._track() + new_ref = source._track(self.__sources[0:index]) refs.append((source._unique_id, new_ref)) # Complimentary warning that the new ref will be unused. diff --git a/buildstream/plugins/sources/pip.py b/buildstream/plugins/sources/pip.py new file mode 100644 index 000000000..18e65c73d --- /dev/null +++ b/buildstream/plugins/sources/pip.py @@ -0,0 +1,237 @@ +# +# Copyright 2018 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Chandan Singh <csingh43@bloomberg.net> + +""" +pip - stage python packages using pip +===================================== + +**Host depndencies:** + + * ``pip`` python module + +This plugin will download source distributions for specified packages using +``pip`` but will not install them. It is expected that the elements using this +source will install the downloaded packages. + +Downloaded tarballs will be stored in a directory called ".bst_pip_downloads". + +**Usage:** + +.. code:: yaml + + # Specify the pip source kind + kind: pip + + # Optionally specify index url, defaults to PyPi + # This url is used to discover new versions of packages and download them + # Projects intending to mirror their sources to a permanent location should + # use an aliased url, and declare the alias in the project configuration + url: https://mypypi.example.com/simple + + # Optionally specify the path to requirements files + # Note that either 'requirements-files' or 'packages' must be defined + requirements-files: + - requirements.txt + + # Optionally specify a list of additional packages + # Note that either 'requirements-files' or 'packages' must be defined + packages: + - flake8 + + # Optionally specify a relative staging directory + directory: path/to/stage + + # Specify the ref. It is a list of strings of format + # "<package-name>==<version>", separated by "\\n". + # Usually this will be contents of a requirements.txt file where all + # package versions have been frozen. + ref: "flake8==3.5.0\\nmccabe==0.6.1\\npkg-resources==0.0.0\\npycodestyle==2.3.1\\npyflakes==1.6.0" + +.. note:: + + The ``pip`` plugin is available since :ref:`format version 16 <project_format_version>` + +""" + +import errno +import hashlib +import os +import re + +from buildstream import Consistency, Source, SourceError, utils + +_OUTPUT_DIRNAME = '.bst_pip_downloads' +_PYPI_INDEX_URL = 'https://pypi.org/simple/' + +# Used only for finding pip command +_PYTHON_VERSIONS = [ + 'python2.7', + 'python3.0', + 'python3.1', + 'python3.2', + 'python3.3', + 'python3.4', + 'python3.5', + 'python3.6', + 'python3.7', +] + +# List of allowed extensions taken from +# https://docs.python.org/3/distutils/sourcedist.html. +# Names of source distribution archives must be of the form +# '%{package-name}-%{version}.%{extension}'. +_SDIST_RE = re.compile( + r'^([a-zA-Z0-9]+?)-(.+).(?:tar|tar.bz2|tar.gz|tar.xz|tar.Z|zip)$', + re.IGNORECASE) + + +class PipSource(Source): + # pylint: disable=attribute-defined-outside-init + + # We need access to previous sources at track time to use requirements.txt + # but not at fetch time as self.ref should contain sufficient information + # for this plugin + BST_REQUIRES_PREVIOUS_SOURCES_TRACK = True + + def configure(self, node): + self.node_validate(node, ['url', 'packages', 'ref', 'requirements-files'] + + Source.COMMON_CONFIG_KEYS) + self.ref = self.node_get_member(node, str, 'ref', None) + self.original_url = self.node_get_member(node, str, 'url', _PYPI_INDEX_URL) + self.index_url = self.translate_url(self.original_url) + self.packages = self.node_get_member(node, list, 'packages', []) + self.requirements_files = self.node_get_member(node, list, 'requirements-files', []) + + if not (self.packages or self.requirements_files): + raise SourceError("{}: Either 'packages' or 'requirements-files' must be specified". format(self)) + + def preflight(self): + # Try to find a pip version that supports download command + self.host_pip = None + for python in reversed(_PYTHON_VERSIONS): + try: + host_python = utils.get_host_tool(python) + rc = self.call([host_python, '-m', 'pip', 'download', '--help']) + if rc == 0: + self.host_pip = [host_python, '-m', 'pip'] + break + except utils.ProgramNotFoundError: + pass + + if self.host_pip is None: + raise SourceError("{}: Unable to find a suitable pip command".format(self)) + + def get_unique_key(self): + return [self.original_url, self.ref] + + def get_consistency(self): + if not self.ref: + return Consistency.INCONSISTENT + if os.path.exists(self._mirror) and os.listdir(self._mirror): + return Consistency.CACHED + return Consistency.RESOLVED + + def get_ref(self): + return self.ref + + def load_ref(self, node): + self.ref = self.node_get_member(node, str, 'ref', None) + + def set_ref(self, ref, node): + node['ref'] = self.ref = ref + + def track(self, previous_sources_dir): + # XXX pip does not offer any public API other than the CLI tool so it + # is not feasible to correctly parse the requirements file or to check + # which package versions pip is going to install. + # See https://pip.pypa.io/en/stable/user_guide/#using-pip-from-your-program + # for details. + # As a result, we have to wastefully install the packages during track. + with self.tempdir() as tmpdir: + install_args = self.host_pip + ['download', + '--no-binary', ':all:', + '--index-url', self.index_url, + '--dest', tmpdir] + for requirement_file in self.requirements_files: + fpath = os.path.join(previous_sources_dir, requirement_file) + install_args += ['-r', fpath] + install_args += self.packages + + self.call(install_args, fail="Failed to install python packages") + reqs = self._parse_sdist_names(tmpdir) + + return '\n'.join(["{}=={}".format(pkg, ver) for pkg, ver in reqs]) + + def fetch(self): + with self.tempdir() as tmpdir: + packages = self.ref.strip().split('\n') + package_dir = os.path.join(tmpdir, 'packages') + os.makedirs(package_dir) + self.call(self.host_pip + ['download', + '--no-binary', ':all:', + '--index-url', self.index_url, + '--dest', package_dir] + packages, + fail="Failed to install python packages: {}".format(packages)) + + # If the mirror directory already exists, assume that some other + # process has fetched the sources before us and ensure that we do + # not raise an error in that case. + try: + os.makedirs(self._mirror) + os.rename(package_dir, self._mirror) + except FileExistsError: + return + except OSError as e: + if e.errno != errno.ENOTEMPTY: + raise + + def stage(self, directory): + with self.timed_activity("Staging Python packages", silent_nested=True): + utils.copy_files(self._mirror, os.path.join(directory, _OUTPUT_DIRNAME)) + + # Directory where this source should stage its files + # + @property + def _mirror(self): + if not self.ref: + return None + return os.path.join(self.get_mirror_directory(), + utils.url_directory_name(self.original_url), + hashlib.sha256(self.ref.encode()).hexdigest()) + + # Parse names of downloaded source distributions + # + # Args: + # basedir (str): Directory containing source distribution archives + # + # Returns: + # (list): List of (package_name, version) tuples in sorted order + # + def _parse_sdist_names(self, basedir): + reqs = [] + for f in os.listdir(basedir): + pkg_match = _SDIST_RE.match(f) + if pkg_match: + reqs.append(pkg_match.groups()) + + return sorted(reqs) + + +def setup(): + return PipSource diff --git a/buildstream/source.py b/buildstream/source.py index f72aeae86..ed4dd9617 100644 --- a/buildstream/source.py +++ b/buildstream/source.py @@ -88,6 +88,39 @@ these methods are mandatory to implement. :ref:`SourceFetcher <core_source_fetcher>`. +Accessing previous sources +-------------------------- +*Since: 1.4* + +In the general case, all sources are fetched and tracked independently of one +another. In situations where a source needs to access previous source(s) in +order to perform its own track and/or fetch, following attributes can be set to +request access to previous sources: + +* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK` + + Indicate that access to previous sources is required during track + +* :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH` + + Indicate that access to previous sources is required during fetch + +The intended use of such plugins is to fetch external dependencies of other +sources, typically using some kind of package manager, such that all the +dependencies of the original source(s) are available at build time. + +When implementing such a plugin, implementors should adhere to the following +guidelines: + +* Implementations must be able to store the obtained artifacts in a + subdirectory. + +* Implementations must be able to deterministically generate a unique ref, such + that two refs are different if and only if they produce different outputs. + +* Implementations must not introduce host contamination. + + .. _core_source_fetcher: SourceFetcher - Object for fetching individual URLs @@ -104,6 +137,8 @@ mentioned, these methods are mandatory to implement. Fetches the URL associated with this SourceFetcher, optionally taking an alias override. +Class Reference +--------------- """ import os @@ -175,7 +210,7 @@ class SourceFetcher(): ############################################################# # Abstract Methods # ############################################################# - def fetch(self, alias_override=None): + def fetch(self, alias_override=None, **kwargs): """Fetch remote sources and mirror them locally, ensuring at least that the specific reference is cached locally. @@ -225,6 +260,32 @@ class Source(Plugin): __defaults = {} # The defaults from the project __defaults_set = False # Flag, in case there are not defaults at all + BST_REQUIRES_PREVIOUS_SOURCES_TRACK = False + """Whether access to previous sources is required during track + + When set to True: + * all sources listed before this source in the given element will be + fetched before this source is tracked + * Source.track() will be called with an additional keyword argument + `previous_sources_dir` where previous sources will be staged + * this source can not be the first source for an element + + *Since: 1.4* + """ + + BST_REQUIRES_PREVIOUS_SOURCES_FETCH = False + """Whether access to previous sources is required during fetch + + When set to True: + * all sources listed before this source in the given element will be + fetched before this source is fetched + * Source.fetch() will be called with an additional keyword argument + `previous_sources_dir` where previous sources will be staged + * this source can not be the first source for an element + + *Since: 1.4* + """ + def __init__(self, context, project, meta, *, alias_override=None, unique_id=None): provenance = _yaml.node_get_provenance(meta.config) super().__init__("{}-{}".format(meta.element_name, meta.element_index), @@ -324,9 +385,15 @@ class Source(Plugin): """ raise ImplError("Source plugin '{}' does not implement set_ref()".format(self.get_kind())) - def track(self): + def track(self, **kwargs): """Resolve a new ref from the plugin's track option + Args: + previous_sources_dir (str): directory where previous sources are staged. + Note that this keyword argument is available only when + :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_TRACK` + is set to True. + Returns: (simple object): A new internal source reference, or None @@ -345,10 +412,16 @@ class Source(Plugin): # Allow a non implementation return None - def fetch(self): + def fetch(self, **kwargs): """Fetch remote sources and mirror them locally, ensuring at least that the specific reference is cached locally. + Args: + previous_sources_dir (str): directory where previous sources are staged. + Note that this keyword argument is available only when + :attr:`~buildstream.source.Source.BST_REQUIRES_PREVIOUS_SOURCES_FETCH` + is set to True. + Raises: :class:`.SourceError` @@ -583,78 +656,19 @@ class Source(Plugin): # Wrapper function around plugin provided fetch method # - def _fetch(self): - project = self._get_project() - context = self._get_context() - - # Silence the STATUS messages which might happen as a result - # of checking the source fetchers. - with context.silence(): - source_fetchers = self.get_source_fetchers() - - # Use the source fetchers if they are provided - # - if source_fetchers: - - # Use a contorted loop here, this is to allow us to - # silence the messages which can result from consuming - # the items of source_fetchers, if it happens to be a generator. - # - source_fetchers = iter(source_fetchers) - try: - - while True: - - with context.silence(): - fetcher = next(source_fetchers) - - alias = fetcher._get_alias() - for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): - try: - fetcher.fetch(uri) - # FIXME: Need to consider temporary vs. permanent failures, - # and how this works with retries. - except BstError as e: - last_error = e - continue - - # No error, we're done with this fetcher - break - - else: - # No break occurred, raise the last detected error - raise last_error - - except StopIteration: - pass - - # Default codepath is to reinstantiate the Source - # + # Args: + # previous_sources (list): List of Sources listed prior to this source + # + def _fetch(self, previous_sources): + + if self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH: + self.__ensure_previous_sources(previous_sources) + with self.tempdir() as staging_directory: + for src in previous_sources: + src._stage(staging_directory) + self.__do_fetch(previous_sources_dir=self.__ensure_directory(staging_directory)) else: - alias = self._get_alias() - if self.__first_pass: - mirrors = project.first_pass_config.mirrors - else: - mirrors = project.config.mirrors - if not mirrors or not alias: - self.fetch() - return - - for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): - new_source = self.__clone_for_uri(uri) - try: - new_source.fetch() - # FIXME: Need to consider temporary vs. permanent failures, - # and how this works with retries. - except BstError as e: - last_error = e - continue - - # No error, we're done here - return - - # Re raise the last detected error - raise last_error + self.__do_fetch() # Wrapper for stage() api which gives the source # plugin a fully constructed path considering the @@ -687,24 +701,6 @@ class Source(Plugin): return key - # Wrapper for set_ref(), also returns whether it changed. - # - def _set_ref(self, ref, node): - current_ref = self.get_ref() - changed = False - - # This comparison should work even for tuples and lists, - # but we're mostly concerned about simple strings anyway. - if current_ref != ref: - changed = True - - # Set the ref regardless of whether it changed, the - # TrackQueue() will want to update a specific node with - # the ref, regardless of whether the original has changed. - self.set_ref(ref, node) - - return changed - # _project_refs(): # # Gets the appropriate ProjectRefs object for this source, @@ -781,7 +777,7 @@ class Source(Plugin): return redundant_ref - # _save_ref() + # _set_ref() # # Persists the ref for this source. This will decide where to save the # ref, or refuse to persist it, depending on active ref-storage project @@ -789,6 +785,7 @@ class Source(Plugin): # # Args: # new_ref (smth): The new reference to save + # save (bool): Whether to write the new reference to file or not # # Returns: # (bool): Whether the ref has changed @@ -796,7 +793,7 @@ class Source(Plugin): # Raises: # (SourceError): In the case we encounter errors saving a file to disk # - def _save_ref(self, new_ref): + def _set_ref(self, new_ref, *, save): context = self._get_context() project = self._get_project() @@ -824,7 +821,17 @@ class Source(Plugin): # # Step 2 - Set the ref in memory, and determine changed state # - if not self._set_ref(new_ref, node): + current_ref = self.get_ref() # pylint: disable=assignment-from-no-return + + # Set the ref regardless of whether it changed, the + # TrackQueue() will want to update a specific node with + # the ref, regardless of whether the original has changed. + self.set_ref(new_ref, node) + + if current_ref == new_ref or not save: + # Note: We do not look for and propagate changes at this point + # which might result in desync depending if something changes about + # tracking in the future. For now, this is quite safe. return False def do_save_refs(refs): @@ -866,8 +873,19 @@ class Source(Plugin): # Wrapper for track() # - def _track(self): - new_ref = self.__do_track() + # Args: + # previous_sources (list): List of Sources listed prior to this source + # + def _track(self, previous_sources): + if self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK: + self.__ensure_previous_sources(previous_sources) + with self.tempdir() as staging_directory: + for src in previous_sources: + src._stage(staging_directory) + new_ref = self.__do_track(previous_sources_dir=self.__ensure_directory(staging_directory)) + else: + new_ref = self.__do_track() + current_ref = self.get_ref() if new_ref is None: @@ -877,8 +895,22 @@ class Source(Plugin): if current_ref != new_ref: self.info("Found new revision: {}".format(new_ref)) + # Save ref in local process for subsequent sources + self._set_ref(new_ref, save=False) + return new_ref + # _requires_previous_sources() + # + # If a plugin requires access to previous sources at track or fetch time, + # then it cannot be the first source of an elemenet. + # + # Returns: + # (bool): Whether this source requires access to previous sources + # + def _requires_previous_sources(self): + return self.BST_REQUIRES_PREVIOUS_SOURCES_TRACK or self.BST_REQUIRES_PREVIOUS_SOURCES_FETCH + # Returns the alias if it's defined in the project def _get_alias(self): alias = self.__expected_alias @@ -928,8 +960,52 @@ class Source(Plugin): return clone + # Tries to call fetch for every mirror, stopping once it succeeds + def __do_fetch(self, **kwargs): + project = self._get_project() + source_fetchers = self.get_source_fetchers() + if source_fetchers: + for fetcher in source_fetchers: + alias = fetcher._get_alias() + success = False + for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): + try: + fetcher.fetch(uri) + # FIXME: Need to consider temporary vs. permanent failures, + # and how this works with retries. + except BstError as e: + last_error = e + continue + success = True + break + if not success: + raise last_error + else: + alias = self._get_alias() + if self.__first_pass: + mirrors = project.first_pass_config.mirrors + else: + mirrors = project.config.mirrors + if not mirrors or not alias: + self.fetch(**kwargs) + return + + context = self._get_context() + source_kind = type(self) + for uri in project.get_alias_uris(alias, first_pass=self.__first_pass): + new_source = self.__clone_for_uri(uri) + try: + new_source.fetch(**kwargs) + # FIXME: Need to consider temporary vs. permanent failures, + # and how this works with retries. + except BstError as e: + last_error = e + continue + return + raise last_error + # Tries to call track for every mirror, stopping once it succeeds - def __do_track(self): + def __do_track(self, **kwargs): project = self._get_project() alias = self._get_alias() if self.__first_pass: @@ -938,14 +1014,14 @@ class Source(Plugin): mirrors = project.config.mirrors # If there are no mirrors, or no aliases to replace, there's nothing to do here. if not mirrors or not alias: - return self.track() + return self.track(**kwargs) # NOTE: We are assuming here that tracking only requires substituting the # first alias used for uri in reversed(project.get_alias_uris(alias, first_pass=self.__first_pass)): new_source = self.__clone_for_uri(uri) try: - ref = new_source.track() + ref = new_source.track(**kwargs) # FIXME: Need to consider temporary vs. permanent failures, # and how this works with retries. except BstError as e: @@ -990,6 +1066,17 @@ class Source(Plugin): return config + # Ensures that previous sources have been tracked and fetched. + # + def __ensure_previous_sources(self, previous_sources): + for index, src in enumerate(previous_sources): + # BuildStream should track sources in the order they appear so + # previous sources should never be in an inconsistent state + assert src.get_consistency() != Consistency.INCONSISTENT + + if src.get_consistency() == Consistency.RESOLVED: + src._fetch(previous_sources[0:index]) + def _extract_alias(url): parts = url.split(utils._ALIAS_SEPARATOR, 1) |