diff options
author | Jürg Billeter <j@bitron.ch> | 2020-09-14 12:05:12 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2020-09-14 17:39:17 +0200 |
commit | fdca4c15e314dcf75d05e652798b43948598a012 (patch) | |
tree | 0e50bf62fc62b5584defe7420493c006132e5b7e | |
parent | 9c1dcc068b49a2cabfa925c5a5f75b390883da15 (diff) | |
download | buildstream-juerg/cache-query-wip.tar.gz |
-rw-r--r-- | src/buildstream/_artifact.py | 4 | ||||
-rw-r--r-- | src/buildstream/_artifactelement.py | 4 | ||||
-rw-r--r-- | src/buildstream/_frontend/widget.py | 2 | ||||
-rw-r--r-- | src/buildstream/_pipeline.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 33 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 156 | ||||
-rw-r--r-- | src/buildstream/element.py | 125 | ||||
-rw-r--r-- | src/buildstream/utils.py | 2 | ||||
-rw-r--r-- | tests/frontend/fetch.py | 2 | ||||
-rw-r--r-- | tests/frontend/push.py | 8 | ||||
-rw-r--r-- | tests/integration/interactive_build.py | 3 | ||||
-rw-r--r-- | tox.ini | 4 |
12 files changed, 215 insertions, 135 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py index 048f09cc6..46a74d845 100644 --- a/src/buildstream/_artifact.py +++ b/src/buildstream/_artifact.py @@ -396,11 +396,13 @@ class Artifact: # Returns: # (bool): Whether artifact is in local cache # - def cached(self): + def cached(self, *, allow_main_process=False): if self._cached is not None: return self._cached + assert allow_main_process or not utils._is_main_process() + context = self._context artifact = self._load_proto() diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py index b669d95f7..7c2168aeb 100644 --- a/src/buildstream/_artifactelement.py +++ b/src/buildstream/_artifactelement.py @@ -78,6 +78,10 @@ class ArtifactElement(Element): artifact_element._initialize_state() cls.__instantiated_artifacts[ref] = artifact_element + # Side effect of calling Artifact.cached() is that the artifact + # proto is loaded, which is required for `get_dependency_artifact_names()`. + artifact_element._cached(allow_main_process=True) + for dep_ref in artifact_element.get_dependency_artifact_names(): dependency = ArtifactElement._new_from_artifact_name(dep_ref, context, task) artifact_element._add_build_dependency(dependency) diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py index 65f74d475..dec6d7545 100644 --- a/src/buildstream/_frontend/widget.py +++ b/src/buildstream/_frontend/widget.py @@ -352,6 +352,8 @@ class LogLine(Widget): else: if element.get_kind() == "junction": line = p.fmt_subst(line, "state", "junction", fg="magenta") + elif not element._cache_state_available(): + line = p.fmt_subst(line, "state", "waiting", fg="blue") elif element._cached_failure(): line = p.fmt_subst(line, "state", "failed", fg="red") elif element._cached_success(): diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index 77b3c24e0..cf31608fa 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -119,7 +119,7 @@ class Pipeline: # We may already have Elements which are cached and have their runtimes # cached, if this is the case, we should immediately notify their reverse # dependencies. - element._update_ready_for_runtime_and_cached() + # element._update_ready_for_runtime_and_cached() if task: task.add_current_progress() @@ -477,7 +477,7 @@ class _Planner: self.plan_element(dep, depth) # Dont try to plan builds of elements that are cached already - if not element._cached_success(): + if True or not element._cached_success(): for dep in element._dependencies(_Scope.BUILD, recurse=False): self.plan_element(dep, depth + 1) @@ -494,4 +494,5 @@ class _Planner: for index, item in enumerate(depth_sorted): item[0]._set_depth(index) - return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()] + # return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()] + return [item[0] for item in depth_sorted] diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index e1d69590f..e0ea9ba70 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -33,24 +33,26 @@ class PullQueue(Queue): complete_name = "Artifacts Pulled" resources = [ResourceType.DOWNLOAD, ResourceType.CACHE] + def __init__(self, scheduler, *, check_remotes=True): + super().__init__(scheduler) + + self._check_remotes = check_remotes + def get_process_func(self): - return PullQueue._pull_or_skip + if self._check_remotes: + return PullQueue._pull_or_skip + else: + return PullQueue._check def status(self, element): if not element._can_query_cache(): return QueueStatus.PENDING - if element._pull_pending(): - return QueueStatus.READY - else: - return QueueStatus.SKIP + return QueueStatus.READY def done(self, _, element, result, status): - if status is JobStatus.FAIL: - return - - element._pull_done() + element._pull_done(status is JobStatus.OK, result) def register_pending_element(self, element): # Set a "can_query_cache"_callback for an element which is not @@ -60,5 +62,16 @@ class PullQueue(Queue): @staticmethod def _pull_or_skip(element): - if not element._pull(): + result = element._pull() + element.info("pull: {} {}".format(result, not result)) + if not result: + raise SkipJob(PullQueue.action_name) + return result + + @staticmethod + def _check(element): + result = element._pull(check_remotes=False) + element.info("pull: {} {}".format(result, not result)) + if not result: raise SkipJob(PullQueue.action_name) + return result diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index fbb2fca69..09101e8b6 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -160,6 +160,13 @@ class Stream: load_refs=load_refs, ) + with self._context.messenger.timed_activity("load_selection"): + self._scheduler.clear_queues() + self._add_queue(PullQueue(self._scheduler, check_remotes=False)) + self._enqueue_plan(target_objects) + self._run() + self._scheduler.clear_queues() + return target_objects # shell() @@ -198,44 +205,64 @@ class Stream: if unique_id and element is None: element = Plugin._lookup(unique_id) - missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] - if missing_deps: - if not pull_dependencies: - raise StreamError( - "Elements need to be built or downloaded before staging a shell environment", - detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))), - ) - self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts") - self._scheduler.clear_queues() - self._add_queue(PullQueue(self._scheduler)) - plan = self._pipeline.add_elements([element], missing_deps) - self._enqueue_plan(plan) - self._run() + deps = self._pipeline.dependencies([element], scope) + + self._scheduler.clear_queues() + self._add_queue(PullQueue(self._scheduler)) + plan = self._pipeline.add_elements([element], deps) + self._enqueue_plan(plan) + self._run() + + if False: + missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] + if missing_deps: + if not pull_dependencies: + raise StreamError( + "Elements need to be built or downloaded before staging a shell environment", + detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))), + ) + self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts") + self._scheduler.clear_queues() + self._add_queue(PullQueue(self._scheduler)) + plan = self._pipeline.add_elements([element], missing_deps) + self._enqueue_plan(plan) + self._run() + + buildtree = False + # Check if we require a pull queue attempt, with given artifact state and context + if usebuildtree: + if not element._cached_buildtree(): + require_buildtree = self._buildtree_pull_required([element]) + # Attempt a pull queue for the given element if remote and context allow it + if require_buildtree: + self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree") + self._scheduler.clear_queues() + self._add_queue(PullQueue(self._scheduler)) + self._enqueue_plan(require_buildtree) + self._run() + # Now check if the buildtree was successfully fetched + if element._cached_buildtree(): + buildtree = True + + if not buildtree: + message = "Buildtree is not cached locally or in available remotes" + if usebuildtree == "always": + raise StreamError(message) + + self._message(MessageType.INFO, message + ", shell will be loaded without it") + else: + buildtree = True buildtree = False # Check if we require a pull queue attempt, with given artifact state and context if usebuildtree: - if not element._cached_buildtree(): - require_buildtree = self._buildtree_pull_required([element]) - # Attempt a pull queue for the given element if remote and context allow it - if require_buildtree: - self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree") - self._scheduler.clear_queues() - self._add_queue(PullQueue(self._scheduler)) - self._enqueue_plan(require_buildtree) - self._run() - # Now check if the buildtree was successfully fetched - if element._cached_buildtree(): - buildtree = True - - if not buildtree: - message = "Buildtree is not cached locally or in available remotes" - if usebuildtree == "always": - raise StreamError(message) - - self._message(MessageType.INFO, message + ", shell will be loaded without it") - else: - buildtree = True + buildtree = element._cached_buildtree() + if not buildtree: + message = "Buildtree is not cached locally or in available remotes" + if usebuildtree == "always": + raise StreamError(message) + + self._message(MessageType.INFO, message + ", shell will be loaded without it") # Ensure we have our sources if we are launching a build shell if scope == _Scope.BUILD and not buildtree: @@ -293,8 +320,7 @@ class Stream: # self._scheduler.clear_queues() - if self._artifacts.has_fetch_remotes(): - self._add_queue(PullQueue(self._scheduler)) + self._add_queue(PullQueue(self._scheduler)) self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) @@ -484,33 +510,38 @@ class Stream: self._pipeline.assert_consistent(elements) - # Check if we require a pull queue, with given artifact state and context - require_buildtrees = self._buildtree_pull_required(elements) - if require_buildtrees: - self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees") - self._add_queue(PullQueue(self._scheduler)) - self._enqueue_plan(require_buildtrees) - - # Before we try to push the artifacts, ensure they're cached - cached_elements = [] - uncached_elements = [] - self._message(MessageType.INFO, "Verifying that elements are cached") - for element in elements: - if element._cached(): - cached_elements.append(element) - else: - msg = "{} is not cached".format(element.name) - if self._context.sched_error_action != _SchedulerErrorAction.CONTINUE: - raise StreamError("Push failed: " + msg) + if False: + # Check if we require a pull queue, with given artifact state and context + require_buildtrees = self._buildtree_pull_required(elements) + if require_buildtrees: + self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees") + self._add_queue(PullQueue(self._scheduler)) + self._enqueue_plan(require_buildtrees) + + # Before we try to push the artifacts, ensure they're cached + cached_elements = [] + uncached_elements = [] + self._message(MessageType.INFO, "Verifying that elements are cached") + for element in elements: + if element._cached(): + cached_elements.append(element) + else: + msg = "{} is not cached".format(element.name) + if self._context.sched_error_action != _SchedulerErrorAction.CONTINUE: + raise StreamError("Push failed: " + msg) - self._message(MessageType.WARN, msg) - uncached_elements.append(element) + self._message(MessageType.WARN, msg) + uncached_elements.append(element) - if cached_elements: + if True: # cached_elements: self._scheduler.clear_queues() + # TODO previous behavior was to only pull missing buildtrees but not + # the main part of the artifact. not sure whether that really makes + # sense + self._add_queue(PullQueue(self._scheduler)) push_queue = ArtifactPushQueue(self._scheduler) self._add_queue(push_queue) - self._enqueue_plan(cached_elements, queue=push_queue) + self._enqueue_plan(elements) self._run(announce_session=True) # If the user has selected to continue on error, fail the command @@ -572,12 +603,13 @@ class Stream: self._check_location_writable(location, force=force, tar=tar) - uncached_elts = [elt for elt in elements if not elt._cached()] - if uncached_elts and pull: + # uncached_elts = [elt for elt in elements if not elt._cached()] + if True: # uncached_elts and pull: self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact") self._scheduler.clear_queues() - self._add_queue(PullQueue(self._scheduler)) - self._enqueue_plan(uncached_elts) + self._add_queue(PullQueue(self._scheduler, check_remotes=pull)) + # self._enqueue_plan(uncached_elts) + self._enqueue_plan(elements) self._run(announce_session=True) try: diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 3316d8adb..d225fb9f5 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -1060,11 +1060,17 @@ class Element(Plugin): # (bool): Whether this element is already present in # the artifact cache # - def _cached(self): + def _cached(self, *, allow_main_process=False): if not self.__artifact: return False - return self.__artifact.cached() + return self.__artifact.cached(allow_main_process=allow_main_process) + + def _cache_state_available(self): + if not self.__artifact: + return False + + return self.__artifact._cached is not None # _cached_remotely(): # @@ -1540,13 +1546,12 @@ class Element(Plugin): self.__strict_artifact.reset_cached() + self.__artifact.set_cached() + if successful: # Directly set known cached status as optimization to avoid # querying buildbox-casd and the filesystem. - self.__artifact.set_cached() self.__cached_successfully = True - else: - self.__artifact.reset_cached() # When we're building in non-strict mode, we may have # assembled everything to this point without a strong cache @@ -1745,26 +1750,7 @@ class Element(Plugin): # (bool): Whether a pull operation is pending # def _pull_pending(self): - if self._get_workspace(): - # Workspace builds are never pushed to artifact servers - return False - - # Check whether the pull has been invoked with a specific subdir requested - # in user context, as to complete a partial artifact - pull_buildtrees = self._get_context().pull_buildtrees - - if self.__strict_artifact: - if self.__strict_artifact.cached() and pull_buildtrees: - # If we've specified a subdir, check if the subdir is cached locally - # or if it's possible to get - if self._cached_buildtree() or not self._buildtree_exists(): - return False - elif self.__strict_artifact.cached(): - return False - - # Pull is pending if artifact remote server available - # and pull has not been attempted yet - return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done + return not self.__pull_done # _pull_done() # @@ -1776,22 +1762,40 @@ class Element(Plugin): # # This will result in updating the element state. # - def _pull_done(self): + # Args: + # successful (bool): Whether the pull / cache check was successful + # + def _pull_done(self, successful, strength): self.__pull_done = True - # Artifact may become cached after pulling, so let it query the - # filesystem again to check - self.__strict_artifact.reset_cached() - self.__artifact.reset_cached() + # want to mark it as cached / not cached in the main process + # without querying the cache again + # in non-strict mode this means I not only need to know whether + # pull/check was successful but also whether we have the strict artifact + # in the cache or not + + if successful: + # Artifact may become cached after pulling, so let it query the + # filesystem again to check + if strength == _KeyStrength.STRONG: + self.__artifact = self.__strict_artifact + else: + self.__strict_artifact._cached = False + + self.__artifact.set_cached() + + # If we've finished pulling, an artifact might now exist + # locally, so we might need to update a non-strict strong + # cache key. + self.__update_cache_key_non_strict() + self._update_ready_for_runtime_and_cached() + else: + self.__strict_artifact._cached = False + self.__artifact._cached = False - # We may not have actually pulled an artifact - the pull may - # have failed. We might therefore need to schedule assembly. + # The pull may have failed or we may have pulled a cached failure. + # We might therefore need to schedule assembly. self.__schedule_assembly_when_necessary() - # If we've finished pulling, an artifact might now exist - # locally, so we might need to update a non-strict strong - # cache key. - self.__update_cache_key_non_strict() - self._update_ready_for_runtime_and_cached() # _pull(): # @@ -1799,24 +1803,39 @@ class Element(Plugin): # # Returns: True if the artifact has been downloaded, False otherwise # - def _pull(self): + def _pull(self, *, check_remotes=True): context = self._get_context() - # Get optional specific subdir to pull and optional list to not pull - # based off of user context - pull_buildtrees = context.pull_buildtrees + # Check whether the pull has been invoked with a specific subdir requested + # in user context, as to complete a partial artifact + pull_buildtrees = context.pull_buildtrees and not self._get_workspace() + + # First check whether we already have the strict artifact in the local cache + if self.__strict_artifact.cached(): + if pull_buildtrees: + # If we've specified a subdir, check if the subdir is cached locally + # or if it's possible to get + if self._cached_buildtree() or not self._buildtree_exists(): + return _KeyStrength.STRONG + else: + return _KeyStrength.STRONG - # Attempt to pull artifact without knowing whether it's available - pulled = self.__pull_strong(pull_buildtrees=pull_buildtrees) + # Check remotes if requested and artifact remote server available. + # Workspace builds are never pushed to artifact servers + check_remotes = check_remotes and self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace() - if not pulled and not self._cached() and not context.get_strict(): - pulled = self.__pull_weak(pull_buildtrees=pull_buildtrees) + # Attempt to pull artifact with the strict cache key + if check_remotes and self.__pull_strong(pull_buildtrees=pull_buildtrees): + return _KeyStrength.STRONG - if not pulled: - return False + if not context.get_strict(): + if self._cached(): + return _KeyStrength.WEAK - # Notify successfull download - return True + if check_remotes and self.__pull_weak(pull_buildtrees=pull_buildtrees): + return _KeyStrength.WEAK + + return None def _skip_source_push(self): if not self.sources() or self._get_workspace(): @@ -1834,6 +1853,9 @@ class Element(Plugin): # (bool): True if this element does not need a push job to be created # def _skip_push(self): + if not self._cached(): + return True + if not self.__artifacts.has_push_remotes(plugin=self): # No push remotes for this element's project return True @@ -2244,7 +2266,7 @@ class Element(Plugin): # def _update_ready_for_runtime_and_cached(self): if not self.__ready_for_runtime_and_cached: - if self.__runtime_deps_uncached == 0 and self._cached_success() and self.__cache_key: + if self.__runtime_deps_uncached == 0 and self.__pull_done and self._cached_success() and self.__cache_key: self.__ready_for_runtime_and_cached = True # Notify reverse dependencies @@ -2956,7 +2978,8 @@ class Element(Plugin): return False # extract strong cache key from this newly fetched artifact - self._pull_done() + # FIXME + self._pull_done(True, _KeyStrength.WEAK) # create tag for strong cache key key = self._get_cache_key(strength=_KeyStrength.STRONG) diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index e9f06aad6..eb4881837 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -34,6 +34,7 @@ from stat import S_ISDIR import subprocess from subprocess import TimeoutExpired import tempfile +import threading import time import datetime import itertools @@ -874,6 +875,7 @@ def _pretty_size(size, dec_places=0): # Return whether we are in the main process or not. # def _is_main_process(): + return threading.current_thread() == threading.main_thread() assert _MAIN_PID is not None return os.getpid() == _MAIN_PID diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py index b2c9d64c2..3dfeabe5d 100644 --- a/tests/frontend/fetch.py +++ b/tests/frontend/fetch.py @@ -65,7 +65,7 @@ def test_fetch_consistency_error(cli, datafiles): # When the error occurs outside of the scheduler at load time, # then the SourceError is reported directly as the main error. result = cli.run(project=project, args=["source", "fetch", "error.bst"]) - result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error") + result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error", debug=True) @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror")) diff --git a/tests/frontend/push.py b/tests/frontend/push.py index ffcc166f9..0054636ec 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -380,8 +380,8 @@ def test_push_after_pull(cli, tmpdir, datafiles): # result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() - assert result.get_pulled_elements() == ["target.bst"] - assert result.get_pushed_elements() == [] + assert "target.bst" in result.get_pulled_elements() + assert "target.bst" not in result.get_pushed_elements() # Delete the artifact locally again. cli.remove_artifact_from_cache(project, "target.bst") @@ -391,8 +391,8 @@ def test_push_after_pull(cli, tmpdir, datafiles): cli.configure({"artifacts": [{"url": share1.repo, "push": True}, {"url": share2.repo, "push": True},]}) result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() - assert result.get_pulled_elements() == ["target.bst"] - assert result.get_pushed_elements() == ["target.bst"] + assert "target.bst" in result.get_pulled_elements() + assert "target.bst" in result.get_pushed_elements() # Ensure that when an artifact's size exceeds available disk space diff --git a/tests/integration/interactive_build.py b/tests/integration/interactive_build.py index 285cb86f8..b19d16e0f 100644 --- a/tests/integration/interactive_build.py +++ b/tests/integration/interactive_build.py @@ -4,6 +4,7 @@ import os import pexpect import pytest +import sys from buildstream.testing import runcli from buildstream.testing._utils.site import HAVE_SANDBOX @@ -29,7 +30,9 @@ def build_session(datafiles, element_name): "bst", ["--directory", project, "--config", config_file, "--no-colors", "build", element_name,], timeout=PEXPECT_TIMEOUT_SHORT, + encoding='utf-8' ) + session.logfile = sys.stderr yield session @@ -16,9 +16,7 @@ BST_PLUGINS_EXPERIMENTAL_VERSION = 1.93.4 # Anything specified here is inherited by the sections # [testenv] -usedevelop = - # This is required by Cython in order to get coverage for cython files. - py{36,37,38}-!nocover: True +usedevelop = True commands = # Running with coverage reporting enabled |