summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-09-14 12:05:12 +0200
committerJürg Billeter <j@bitron.ch>2020-09-14 17:39:17 +0200
commitfdca4c15e314dcf75d05e652798b43948598a012 (patch)
tree0e50bf62fc62b5584defe7420493c006132e5b7e
parent9c1dcc068b49a2cabfa925c5a5f75b390883da15 (diff)
downloadbuildstream-juerg/cache-query-wip.tar.gz
-rw-r--r--src/buildstream/_artifact.py4
-rw-r--r--src/buildstream/_artifactelement.py4
-rw-r--r--src/buildstream/_frontend/widget.py2
-rw-r--r--src/buildstream/_pipeline.py7
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py33
-rw-r--r--src/buildstream/_stream.py156
-rw-r--r--src/buildstream/element.py125
-rw-r--r--src/buildstream/utils.py2
-rw-r--r--tests/frontend/fetch.py2
-rw-r--r--tests/frontend/push.py8
-rw-r--r--tests/integration/interactive_build.py3
-rw-r--r--tox.ini4
12 files changed, 215 insertions, 135 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index 048f09cc6..46a74d845 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -396,11 +396,13 @@ class Artifact:
# Returns:
# (bool): Whether artifact is in local cache
#
- def cached(self):
+ def cached(self, *, allow_main_process=False):
if self._cached is not None:
return self._cached
+ assert allow_main_process or not utils._is_main_process()
+
context = self._context
artifact = self._load_proto()
diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py
index b669d95f7..7c2168aeb 100644
--- a/src/buildstream/_artifactelement.py
+++ b/src/buildstream/_artifactelement.py
@@ -78,6 +78,10 @@ class ArtifactElement(Element):
artifact_element._initialize_state()
cls.__instantiated_artifacts[ref] = artifact_element
+ # Side effect of calling Artifact.cached() is that the artifact
+ # proto is loaded, which is required for `get_dependency_artifact_names()`.
+ artifact_element._cached(allow_main_process=True)
+
for dep_ref in artifact_element.get_dependency_artifact_names():
dependency = ArtifactElement._new_from_artifact_name(dep_ref, context, task)
artifact_element._add_build_dependency(dependency)
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 65f74d475..dec6d7545 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -352,6 +352,8 @@ class LogLine(Widget):
else:
if element.get_kind() == "junction":
line = p.fmt_subst(line, "state", "junction", fg="magenta")
+ elif not element._cache_state_available():
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._cached_failure():
line = p.fmt_subst(line, "state", "failed", fg="red")
elif element._cached_success():
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index 77b3c24e0..cf31608fa 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -119,7 +119,7 @@ class Pipeline:
# We may already have Elements which are cached and have their runtimes
# cached, if this is the case, we should immediately notify their reverse
# dependencies.
- element._update_ready_for_runtime_and_cached()
+ # element._update_ready_for_runtime_and_cached()
if task:
task.add_current_progress()
@@ -477,7 +477,7 @@ class _Planner:
self.plan_element(dep, depth)
# Dont try to plan builds of elements that are cached already
- if not element._cached_success():
+ if True or not element._cached_success():
for dep in element._dependencies(_Scope.BUILD, recurse=False):
self.plan_element(dep, depth + 1)
@@ -494,4 +494,5 @@ class _Planner:
for index, item in enumerate(depth_sorted):
item[0]._set_depth(index)
- return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+ # return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+ return [item[0] for item in depth_sorted]
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d69590f..e0ea9ba70 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,24 +33,26 @@ class PullQueue(Queue):
complete_name = "Artifacts Pulled"
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
+ def __init__(self, scheduler, *, check_remotes=True):
+ super().__init__(scheduler)
+
+ self._check_remotes = check_remotes
+
def get_process_func(self):
- return PullQueue._pull_or_skip
+ if self._check_remotes:
+ return PullQueue._pull_or_skip
+ else:
+ return PullQueue._check
def status(self, element):
if not element._can_query_cache():
return QueueStatus.PENDING
- if element._pull_pending():
- return QueueStatus.READY
- else:
- return QueueStatus.SKIP
+ return QueueStatus.READY
def done(self, _, element, result, status):
- if status is JobStatus.FAIL:
- return
-
- element._pull_done()
+ element._pull_done(status is JobStatus.OK, result)
def register_pending_element(self, element):
# Set a "can_query_cache"_callback for an element which is not
@@ -60,5 +62,16 @@ class PullQueue(Queue):
@staticmethod
def _pull_or_skip(element):
- if not element._pull():
+ result = element._pull()
+ element.info("pull: {} {}".format(result, not result))
+ if not result:
+ raise SkipJob(PullQueue.action_name)
+ return result
+
+ @staticmethod
+ def _check(element):
+ result = element._pull(check_remotes=False)
+ element.info("pull: {} {}".format(result, not result))
+ if not result:
raise SkipJob(PullQueue.action_name)
+ return result
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index fbb2fca69..09101e8b6 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -160,6 +160,13 @@ class Stream:
load_refs=load_refs,
)
+ with self._context.messenger.timed_activity("load_selection"):
+ self._scheduler.clear_queues()
+ self._add_queue(PullQueue(self._scheduler, check_remotes=False))
+ self._enqueue_plan(target_objects)
+ self._run()
+ self._scheduler.clear_queues()
+
return target_objects
# shell()
@@ -198,44 +205,64 @@ class Stream:
if unique_id and element is None:
element = Plugin._lookup(unique_id)
- missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
- if missing_deps:
- if not pull_dependencies:
- raise StreamError(
- "Elements need to be built or downloaded before staging a shell environment",
- detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))),
- )
- self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts")
- self._scheduler.clear_queues()
- self._add_queue(PullQueue(self._scheduler))
- plan = self._pipeline.add_elements([element], missing_deps)
- self._enqueue_plan(plan)
- self._run()
+ deps = self._pipeline.dependencies([element], scope)
+
+ self._scheduler.clear_queues()
+ self._add_queue(PullQueue(self._scheduler))
+ plan = self._pipeline.add_elements([element], deps)
+ self._enqueue_plan(plan)
+ self._run()
+
+ if False:
+ missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
+ if missing_deps:
+ if not pull_dependencies:
+ raise StreamError(
+ "Elements need to be built or downloaded before staging a shell environment",
+ detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))),
+ )
+ self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts")
+ self._scheduler.clear_queues()
+ self._add_queue(PullQueue(self._scheduler))
+ plan = self._pipeline.add_elements([element], missing_deps)
+ self._enqueue_plan(plan)
+ self._run()
+
+ buildtree = False
+ # Check if we require a pull queue attempt, with given artifact state and context
+ if usebuildtree:
+ if not element._cached_buildtree():
+ require_buildtree = self._buildtree_pull_required([element])
+ # Attempt a pull queue for the given element if remote and context allow it
+ if require_buildtree:
+ self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree")
+ self._scheduler.clear_queues()
+ self._add_queue(PullQueue(self._scheduler))
+ self._enqueue_plan(require_buildtree)
+ self._run()
+ # Now check if the buildtree was successfully fetched
+ if element._cached_buildtree():
+ buildtree = True
+
+ if not buildtree:
+ message = "Buildtree is not cached locally or in available remotes"
+ if usebuildtree == "always":
+ raise StreamError(message)
+
+ self._message(MessageType.INFO, message + ", shell will be loaded without it")
+ else:
+ buildtree = True
buildtree = False
# Check if we require a pull queue attempt, with given artifact state and context
if usebuildtree:
- if not element._cached_buildtree():
- require_buildtree = self._buildtree_pull_required([element])
- # Attempt a pull queue for the given element if remote and context allow it
- if require_buildtree:
- self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtree")
- self._scheduler.clear_queues()
- self._add_queue(PullQueue(self._scheduler))
- self._enqueue_plan(require_buildtree)
- self._run()
- # Now check if the buildtree was successfully fetched
- if element._cached_buildtree():
- buildtree = True
-
- if not buildtree:
- message = "Buildtree is not cached locally or in available remotes"
- if usebuildtree == "always":
- raise StreamError(message)
-
- self._message(MessageType.INFO, message + ", shell will be loaded without it")
- else:
- buildtree = True
+ buildtree = element._cached_buildtree()
+ if not buildtree:
+ message = "Buildtree is not cached locally or in available remotes"
+ if usebuildtree == "always":
+ raise StreamError(message)
+
+ self._message(MessageType.INFO, message + ", shell will be loaded without it")
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not buildtree:
@@ -293,8 +320,7 @@ class Stream:
#
self._scheduler.clear_queues()
- if self._artifacts.has_fetch_remotes():
- self._add_queue(PullQueue(self._scheduler))
+ self._add_queue(PullQueue(self._scheduler))
self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
@@ -484,33 +510,38 @@ class Stream:
self._pipeline.assert_consistent(elements)
- # Check if we require a pull queue, with given artifact state and context
- require_buildtrees = self._buildtree_pull_required(elements)
- if require_buildtrees:
- self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees")
- self._add_queue(PullQueue(self._scheduler))
- self._enqueue_plan(require_buildtrees)
-
- # Before we try to push the artifacts, ensure they're cached
- cached_elements = []
- uncached_elements = []
- self._message(MessageType.INFO, "Verifying that elements are cached")
- for element in elements:
- if element._cached():
- cached_elements.append(element)
- else:
- msg = "{} is not cached".format(element.name)
- if self._context.sched_error_action != _SchedulerErrorAction.CONTINUE:
- raise StreamError("Push failed: " + msg)
+ if False:
+ # Check if we require a pull queue, with given artifact state and context
+ require_buildtrees = self._buildtree_pull_required(elements)
+ if require_buildtrees:
+ self._message(MessageType.INFO, "Attempting to fetch missing artifact buildtrees")
+ self._add_queue(PullQueue(self._scheduler))
+ self._enqueue_plan(require_buildtrees)
+
+ # Before we try to push the artifacts, ensure they're cached
+ cached_elements = []
+ uncached_elements = []
+ self._message(MessageType.INFO, "Verifying that elements are cached")
+ for element in elements:
+ if element._cached():
+ cached_elements.append(element)
+ else:
+ msg = "{} is not cached".format(element.name)
+ if self._context.sched_error_action != _SchedulerErrorAction.CONTINUE:
+ raise StreamError("Push failed: " + msg)
- self._message(MessageType.WARN, msg)
- uncached_elements.append(element)
+ self._message(MessageType.WARN, msg)
+ uncached_elements.append(element)
- if cached_elements:
+ if True: # cached_elements:
self._scheduler.clear_queues()
+ # TODO previous behavior was to only pull missing buildtrees but not
+ # the main part of the artifact. not sure whether that really makes
+ # sense
+ self._add_queue(PullQueue(self._scheduler))
push_queue = ArtifactPushQueue(self._scheduler)
self._add_queue(push_queue)
- self._enqueue_plan(cached_elements, queue=push_queue)
+ self._enqueue_plan(elements)
self._run(announce_session=True)
# If the user has selected to continue on error, fail the command
@@ -572,12 +603,13 @@ class Stream:
self._check_location_writable(location, force=force, tar=tar)
- uncached_elts = [elt for elt in elements if not elt._cached()]
- if uncached_elts and pull:
+ # uncached_elts = [elt for elt in elements if not elt._cached()]
+ if True: # uncached_elts and pull:
self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifact")
self._scheduler.clear_queues()
- self._add_queue(PullQueue(self._scheduler))
- self._enqueue_plan(uncached_elts)
+ self._add_queue(PullQueue(self._scheduler, check_remotes=pull))
+ # self._enqueue_plan(uncached_elts)
+ self._enqueue_plan(elements)
self._run(announce_session=True)
try:
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 3316d8adb..d225fb9f5 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1060,11 +1060,17 @@ class Element(Plugin):
# (bool): Whether this element is already present in
# the artifact cache
#
- def _cached(self):
+ def _cached(self, *, allow_main_process=False):
if not self.__artifact:
return False
- return self.__artifact.cached()
+ return self.__artifact.cached(allow_main_process=allow_main_process)
+
+ def _cache_state_available(self):
+ if not self.__artifact:
+ return False
+
+ return self.__artifact._cached is not None
# _cached_remotely():
#
@@ -1540,13 +1546,12 @@ class Element(Plugin):
self.__strict_artifact.reset_cached()
+ self.__artifact.set_cached()
+
if successful:
# Directly set known cached status as optimization to avoid
# querying buildbox-casd and the filesystem.
- self.__artifact.set_cached()
self.__cached_successfully = True
- else:
- self.__artifact.reset_cached()
# When we're building in non-strict mode, we may have
# assembled everything to this point without a strong cache
@@ -1745,26 +1750,7 @@ class Element(Plugin):
# (bool): Whether a pull operation is pending
#
def _pull_pending(self):
- if self._get_workspace():
- # Workspace builds are never pushed to artifact servers
- return False
-
- # Check whether the pull has been invoked with a specific subdir requested
- # in user context, as to complete a partial artifact
- pull_buildtrees = self._get_context().pull_buildtrees
-
- if self.__strict_artifact:
- if self.__strict_artifact.cached() and pull_buildtrees:
- # If we've specified a subdir, check if the subdir is cached locally
- # or if it's possible to get
- if self._cached_buildtree() or not self._buildtree_exists():
- return False
- elif self.__strict_artifact.cached():
- return False
-
- # Pull is pending if artifact remote server available
- # and pull has not been attempted yet
- return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
+ return not self.__pull_done
# _pull_done()
#
@@ -1776,22 +1762,40 @@ class Element(Plugin):
#
# This will result in updating the element state.
#
- def _pull_done(self):
+ # Args:
+ # successful (bool): Whether the pull / cache check was successful
+ #
+ def _pull_done(self, successful, strength):
self.__pull_done = True
- # Artifact may become cached after pulling, so let it query the
- # filesystem again to check
- self.__strict_artifact.reset_cached()
- self.__artifact.reset_cached()
+ # want to mark it as cached / not cached in the main process
+ # without querying the cache again
+ # in non-strict mode this means I not only need to know whether
+ # pull/check was successful but also whether we have the strict artifact
+ # in the cache or not
+
+ if successful:
+ # Artifact may become cached after pulling, so let it query the
+ # filesystem again to check
+ if strength == _KeyStrength.STRONG:
+ self.__artifact = self.__strict_artifact
+ else:
+ self.__strict_artifact._cached = False
+
+ self.__artifact.set_cached()
+
+ # If we've finished pulling, an artifact might now exist
+ # locally, so we might need to update a non-strict strong
+ # cache key.
+ self.__update_cache_key_non_strict()
+ self._update_ready_for_runtime_and_cached()
+ else:
+ self.__strict_artifact._cached = False
+ self.__artifact._cached = False
- # We may not have actually pulled an artifact - the pull may
- # have failed. We might therefore need to schedule assembly.
+ # The pull may have failed or we may have pulled a cached failure.
+ # We might therefore need to schedule assembly.
self.__schedule_assembly_when_necessary()
- # If we've finished pulling, an artifact might now exist
- # locally, so we might need to update a non-strict strong
- # cache key.
- self.__update_cache_key_non_strict()
- self._update_ready_for_runtime_and_cached()
# _pull():
#
@@ -1799,24 +1803,39 @@ class Element(Plugin):
#
# Returns: True if the artifact has been downloaded, False otherwise
#
- def _pull(self):
+ def _pull(self, *, check_remotes=True):
context = self._get_context()
- # Get optional specific subdir to pull and optional list to not pull
- # based off of user context
- pull_buildtrees = context.pull_buildtrees
+ # Check whether the pull has been invoked with a specific subdir requested
+ # in user context, as to complete a partial artifact
+ pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
+
+ # First check whether we already have the strict artifact in the local cache
+ if self.__strict_artifact.cached():
+ if pull_buildtrees:
+ # If we've specified a subdir, check if the subdir is cached locally
+ # or if it's possible to get
+ if self._cached_buildtree() or not self._buildtree_exists():
+ return _KeyStrength.STRONG
+ else:
+ return _KeyStrength.STRONG
- # Attempt to pull artifact without knowing whether it's available
- pulled = self.__pull_strong(pull_buildtrees=pull_buildtrees)
+ # Check remotes if requested and artifact remote server available.
+ # Workspace builds are never pushed to artifact servers
+ check_remotes = check_remotes and self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace()
- if not pulled and not self._cached() and not context.get_strict():
- pulled = self.__pull_weak(pull_buildtrees=pull_buildtrees)
+ # Attempt to pull artifact with the strict cache key
+ if check_remotes and self.__pull_strong(pull_buildtrees=pull_buildtrees):
+ return _KeyStrength.STRONG
- if not pulled:
- return False
+ if not context.get_strict():
+ if self._cached():
+ return _KeyStrength.WEAK
- # Notify successfull download
- return True
+ if check_remotes and self.__pull_weak(pull_buildtrees=pull_buildtrees):
+ return _KeyStrength.WEAK
+
+ return None
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
@@ -1834,6 +1853,9 @@ class Element(Plugin):
# (bool): True if this element does not need a push job to be created
#
def _skip_push(self):
+ if not self._cached():
+ return True
+
if not self.__artifacts.has_push_remotes(plugin=self):
# No push remotes for this element's project
return True
@@ -2244,7 +2266,7 @@ class Element(Plugin):
#
def _update_ready_for_runtime_and_cached(self):
if not self.__ready_for_runtime_and_cached:
- if self.__runtime_deps_uncached == 0 and self._cached_success() and self.__cache_key:
+ if self.__runtime_deps_uncached == 0 and self.__pull_done and self._cached_success() and self.__cache_key:
self.__ready_for_runtime_and_cached = True
# Notify reverse dependencies
@@ -2956,7 +2978,8 @@ class Element(Plugin):
return False
# extract strong cache key from this newly fetched artifact
- self._pull_done()
+ # FIXME
+ self._pull_done(True, _KeyStrength.WEAK)
# create tag for strong cache key
key = self._get_cache_key(strength=_KeyStrength.STRONG)
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index e9f06aad6..eb4881837 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -34,6 +34,7 @@ from stat import S_ISDIR
import subprocess
from subprocess import TimeoutExpired
import tempfile
+import threading
import time
import datetime
import itertools
@@ -874,6 +875,7 @@ def _pretty_size(size, dec_places=0):
# Return whether we are in the main process or not.
#
def _is_main_process():
+ return threading.current_thread() == threading.main_thread()
assert _MAIN_PID is not None
return os.getpid() == _MAIN_PID
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64c2..3dfeabe5d 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -65,7 +65,7 @@ def test_fetch_consistency_error(cli, datafiles):
# When the error occurs outside of the scheduler at load time,
# then the SourceError is reported directly as the main error.
result = cli.run(project=project, args=["source", "fetch", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+ result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error", debug=True)
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index ffcc166f9..0054636ec 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -380,8 +380,8 @@ def test_push_after_pull(cli, tmpdir, datafiles):
#
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == []
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" not in result.get_pushed_elements()
# Delete the artifact locally again.
cli.remove_artifact_from_cache(project, "target.bst")
@@ -391,8 +391,8 @@ def test_push_after_pull(cli, tmpdir, datafiles):
cli.configure({"artifacts": [{"url": share1.repo, "push": True}, {"url": share2.repo, "push": True},]})
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == ["target.bst"]
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" in result.get_pushed_elements()
# Ensure that when an artifact's size exceeds available disk space
diff --git a/tests/integration/interactive_build.py b/tests/integration/interactive_build.py
index 285cb86f8..b19d16e0f 100644
--- a/tests/integration/interactive_build.py
+++ b/tests/integration/interactive_build.py
@@ -4,6 +4,7 @@
import os
import pexpect
import pytest
+import sys
from buildstream.testing import runcli
from buildstream.testing._utils.site import HAVE_SANDBOX
@@ -29,7 +30,9 @@ def build_session(datafiles, element_name):
"bst",
["--directory", project, "--config", config_file, "--no-colors", "build", element_name,],
timeout=PEXPECT_TIMEOUT_SHORT,
+ encoding='utf-8'
)
+ session.logfile = sys.stderr
yield session
diff --git a/tox.ini b/tox.ini
index 0c798f284..ab3e82051 100644
--- a/tox.ini
+++ b/tox.ini
@@ -16,9 +16,7 @@ BST_PLUGINS_EXPERIMENTAL_VERSION = 1.93.4
# Anything specified here is inherited by the sections
#
[testenv]
-usedevelop =
- # This is required by Cython in order to get coverage for cython files.
- py{36,37,38}-!nocover: True
+usedevelop = True
commands =
# Running with coverage reporting enabled