summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-04-20 14:57:21 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-04-20 14:57:21 +0000
commit3d77f1583fd020ce5c18da1fa3c01d40784c2abd (patch)
treee6887c58808b83140bbb459568c38d0591b0a23a
parent74bbafa4c29caff91574316fb25fe2a5f9ff5fbf (diff)
parentf42205787e33174408012e5de3e3576acaab9f9b (diff)
downloadbuildstream-3d77f1583fd020ce5c18da1fa3c01d40784c2abd.tar.gz
Merge branch 'juerg/artifact-blob-not-found' into 'master'
Fix handling of missing blobs in `ArtifactCache.pull()` Closes #1276 See merge request BuildStream/buildstream!1843
-rw-r--r--src/buildstream/_artifact.py43
-rw-r--r--src/buildstream/_artifactcache.py10
-rw-r--r--src/buildstream/_cas/cascache.py19
-rw-r--r--src/buildstream/_sourcecache.py6
-rw-r--r--src/buildstream/sandbox/_sandboxremote.py7
-rw-r--r--src/buildstream/storage/_casbaseddirectory.py2
-rw-r--r--tests/frontend/project/elements/random.bst1
-rw-r--r--tests/frontend/project/plugins/randomelement.py36
-rw-r--r--tests/frontend/pull.py92
-rw-r--r--tests/frontend/push.py38
10 files changed, 180 insertions, 74 deletions
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index 659facba4..0a70d096f 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -423,8 +423,7 @@ class Artifact:
context = self._context
- artifact = self._get_proto()
-
+ artifact = self._load_proto()
if not artifact:
self._cached = False
return False
@@ -443,11 +442,14 @@ class Artifact:
self._cached = False
return False
- # Check whether public data is available
- if not self._cas.contains_file(artifact.public_data):
+ # Check whether public data and logs are available
+ logfile_digests = [logfile.digest for logfile in artifact.logs]
+ digests = [artifact.public_data] + logfile_digests
+ if not self._cas.contains_files(digests):
self._cached = False
return False
+ self._proto = artifact
self._cached = True
return True
@@ -460,16 +462,9 @@ class Artifact:
# element not cached or missing logs.
#
def cached_logs(self):
- if not self._element._cached():
- return False
-
- artifact = self._get_proto()
-
- for logfile in artifact.logs:
- if not self._cas.contains_file(logfile.digest):
- return False
-
- return True
+ # Log files are currently considered an essential part of an artifact.
+ # If the artifact is cached, its log files are available as well.
+ return self._element._cached()
# reset_cached()
#
@@ -477,6 +472,7 @@ class Artifact:
# is cached or not.
#
def reset_cached(self):
+ self._proto = None
self._cached = None
# set_cached()
@@ -485,18 +481,15 @@ class Artifact:
# This is used as optimization when we know the artifact is available.
#
def set_cached(self):
+ self._proto = self._load_proto()
self._cached = True
- # _get_proto()
+ # load_proto()
#
# Returns:
# (Artifact): Artifact proto
#
- def _get_proto(self):
- # Check if we've already cached the proto object
- if self._proto is not None:
- return self._proto
-
+ def _load_proto(self):
key = self.get_extract_key()
proto_path = os.path.join(self._artifactdir, self._element.get_artifact_name(key=key))
@@ -508,9 +501,15 @@ class Artifact:
return None
os.utime(proto_path)
- # Cache the proto object
- self._proto = artifact
+ return artifact
+
+ # _get_proto()
+ #
+ # Returns:
+ # (Artifact): Artifact proto
+ #
+ def _get_proto(self):
return self._proto
# _get_field_digest()
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index f1648e947..9cebeb1a3 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -22,6 +22,7 @@ import os
import grpc
from ._basecache import BaseCache
+from ._cas.casremote import BlobNotFound
from ._exceptions import ArtifactError, CASError, CacheError, CASRemoteError, RemoteError
from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, artifact_pb2, artifact_pb2_grpc
@@ -281,7 +282,6 @@ class ArtifactCache(BaseCache):
element.status("Pulling artifact {} <- {}".format(display_key, remote))
artifact = self._pull_artifact_proto(element, key, remote)
if artifact:
- element.info("Pulled artifact {} <- {}".format(display_key, remote))
break
element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key))
@@ -307,10 +307,14 @@ class ArtifactCache(BaseCache):
element.status("Pulling data for artifact {} <- {}".format(display_key, remote))
if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees):
- element.info("Pulled data for artifact {} <- {}".format(display_key, remote))
+ element.info("Pulled artifact {} <- {}".format(display_key, remote))
return True
element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key))
+ except BlobNotFound as e:
+ # Not all blobs are available on this remote
+ element.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob))
+ continue
except CASError as e:
element.warn("Could not pull from remote {}: {}".format(remote, e))
errors.append(e)
@@ -401,7 +405,7 @@ class ArtifactCache(BaseCache):
remote.init()
# fetch_blobs() will return the blobs that are still missing
- missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
+ missing_blobs = self.cas.fetch_blobs(remote, missing_blobs, allow_partial=True)
if missing_blobs:
raise ArtifactError("Blobs not found on configured artifact servers")
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 03be75c72..74912c4e2 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -165,20 +165,20 @@ class CASCache:
self._casd_process_manager.release_resources(messenger)
self._casd_process_manager = None
- # contains_file():
+ # contains_files():
#
- # Check whether a digest corresponds to a file which exists in CAS
+ # Check whether file digests exist in the local CAS cache
#
# Args:
# digest (Digest): The file digest to check
#
- # Returns: True if the file is in the cache, False otherwise
+ # Returns: True if the files are in the cache, False otherwise
#
- def contains_file(self, digest):
+ def contains_files(self, digests):
cas = self.get_cas()
request = remote_execution_pb2.FindMissingBlobsRequest()
- request.blob_digests.append(digest)
+ request.blob_digests.extend(digests)
response = cas.FindMissingBlobs(request)
return len(response.missing_blob_digests) == 0
@@ -647,16 +647,19 @@ class CASCache:
# fetch_blobs():
#
- # Fetch blobs from remote CAS. Returns missing blobs that could not be fetched.
+ # Fetch blobs from remote CAS. Optionally returns missing blobs that could
+ # not be fetched.
#
# Args:
# remote (CASRemote): The remote repository to fetch from
# digests (list): The Digests of blobs to fetch
+ # allow_partial (bool): True to return missing blobs, False to raise a
+ # BlobNotFound error if a blob is missing
#
# Returns: The Digests of the blobs that were not available on the remote CAS
#
- def fetch_blobs(self, remote, digests):
- missing_blobs = []
+ def fetch_blobs(self, remote, digests, *, allow_partial=False):
+ missing_blobs = [] if allow_partial else None
remote.init()
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index e485fbd47..4533a2580 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -242,11 +242,7 @@ class SourceCache(BaseCache):
self.cas._fetch_directory(remote, source_proto.files)
required_blobs = self.cas.required_blobs_for_directory(source_proto.files)
missing_blobs = self.cas.local_missing_blobs(required_blobs)
- missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
-
- if missing_blobs:
- source.info("Remote cas ({}) does not have source {} cached".format(remote, display_key))
- continue
+ self.cas.fetch_blobs(remote, missing_blobs)
source.info("Pulled source {} <- {}".format(display_key, remote))
return True
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index 3dcbb2ccc..5b03852f6 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -291,12 +291,7 @@ class SandboxRemote(SandboxREAPI):
blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs)
with CASRemote(self.storage_remote_spec, cascache) as casremote:
- remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch)
-
- if remote_missing_blobs:
- raise SandboxError(
- "{} output files are missing on the CAS server".format(len(remote_missing_blobs))
- )
+ cascache.fetch_blobs(casremote, blobs_to_fetch)
def _execute_action(self, action, flags):
stdout, stderr = self._get_output()
diff --git a/src/buildstream/storage/_casbaseddirectory.py b/src/buildstream/storage/_casbaseddirectory.py
index b8b5ca09c..e33bdc3d7 100644
--- a/src/buildstream/storage/_casbaseddirectory.py
+++ b/src/buildstream/storage/_casbaseddirectory.py
@@ -754,6 +754,8 @@ class CasBasedDirectory(Directory):
raise FileExistsError("{} already exists in {}".format(path[-1], str(subdir)))
with utils._tempnamedfile(mode, encoding=encoding, dir=self.cas_cache.tmpdir) as f:
+ # Make sure the temporary file is readable by buildbox-casd
+ os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
yield f
# Import written temporary file into CAS
f.flush()
diff --git a/tests/frontend/project/elements/random.bst b/tests/frontend/project/elements/random.bst
new file mode 100644
index 000000000..2478dfa2a
--- /dev/null
+++ b/tests/frontend/project/elements/random.bst
@@ -0,0 +1 @@
+kind: randomelement
diff --git a/tests/frontend/project/plugins/randomelement.py b/tests/frontend/project/plugins/randomelement.py
new file mode 100644
index 000000000..b36b75c8a
--- /dev/null
+++ b/tests/frontend/project/plugins/randomelement.py
@@ -0,0 +1,36 @@
+import os
+
+from buildstream import Element
+
+
+class RandomElement(Element):
+ BST_VIRTUAL_DIRECTORY = True
+
+ def configure(self, node):
+ pass
+
+ def preflight(self):
+ pass
+
+ def get_unique_key(self):
+ pass
+
+ def configure_sandbox(self, sandbox):
+ pass
+
+ def stage(self, sandbox):
+ pass
+
+ def assemble(self, sandbox):
+ rootdir = sandbox.get_virtual_directory()
+ outputdir = rootdir.descend("output", create=True)
+
+ # Generate non-reproducible output
+ with outputdir.open_file("random", mode="wb") as f:
+ f.write(os.urandom(64))
+
+ return "/output"
+
+
+def setup():
+ return RandomElement
diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py
index 3ae394fd1..1845f320e 100644
--- a/tests/frontend/pull.py
+++ b/tests/frontend/pull.py
@@ -8,7 +8,13 @@ import pytest
from buildstream import utils, _yaml
from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing import create_repo
-from tests.testutils import create_artifact_share, generate_junction, assert_shared, assert_not_shared
+from tests.testutils import (
+ create_artifact_share,
+ create_split_share,
+ generate_junction,
+ assert_shared,
+ assert_not_shared,
+)
# Project directory
@@ -227,46 +233,74 @@ def test_push_pull_cross_junction(cli, tmpdir, datafiles):
assert cli.get_element_state(project, "junction.bst:import-etc.bst") == "cached"
+def _test_pull_missing_blob(cli, project, index, storage):
+ # First build the target element and push to the remote.
+ result = cli.run(project=project, args=["build", "target.bst"])
+ result.assert_success()
+ assert cli.get_element_state(project, "target.bst") == "cached"
+
+ # Assert that everything is now cached in the remote.
+ all_elements = ["target.bst", "import-bin.bst", "import-dev.bst", "compose-all.bst"]
+ for element_name in all_elements:
+ project_name = "test"
+ artifact_name = cli.get_artifact_name(project, project_name, element_name)
+ artifact_proto = index.get_artifact_proto(artifact_name)
+ assert artifact_proto
+ assert storage.get_cas_files(artifact_proto)
+
+ # Now we've pushed, delete the user's local artifact cache
+ # directory and try to redownload it from the share
+ #
+ casdir = os.path.join(cli.directory, "cas")
+ shutil.rmtree(casdir)
+ artifactdir = os.path.join(cli.directory, "artifacts")
+ shutil.rmtree(artifactdir)
+
+ # Assert that nothing is cached locally anymore
+ for element_name in all_elements:
+ assert cli.get_element_state(project, element_name) != "cached"
+
+ # Now delete blobs in the remote without deleting the artifact ref.
+ # This simulates scenarios with concurrent artifact expiry.
+ remote_objdir = os.path.join(storage.repodir, "cas", "objects")
+ shutil.rmtree(remote_objdir)
+
+ # Now try bst build
+ result = cli.run(project=project, args=["build", "target.bst"])
+ result.assert_success()
+
+ # Assert that no artifacts were pulled
+ assert not result.get_pulled_elements()
+
+
@pytest.mark.datafiles(DATA_DIR)
def test_pull_missing_blob(cli, tmpdir, datafiles):
project = str(datafiles)
with create_artifact_share(os.path.join(str(tmpdir), "artifactshare")) as share:
-
- # First build the target element and push to the remote.
cli.configure({"artifacts": {"url": share.repo, "push": True}})
- result = cli.run(project=project, args=["build", "target.bst"])
- result.assert_success()
- assert cli.get_element_state(project, "target.bst") == "cached"
- # Assert that everything is now cached in the remote.
- all_elements = ["target.bst", "import-bin.bst", "import-dev.bst", "compose-all.bst"]
- for element_name in all_elements:
- assert_shared(cli, share, project, element_name)
+ _test_pull_missing_blob(cli, project, share, share)
- # Now we've pushed, delete the user's local artifact cache
- # directory and try to redownload it from the share
- #
- casdir = os.path.join(cli.directory, "cas")
- shutil.rmtree(casdir)
- artifactdir = os.path.join(cli.directory, "artifacts")
- shutil.rmtree(artifactdir)
- # Assert that nothing is cached locally anymore
- for element_name in all_elements:
- assert cli.get_element_state(project, element_name) != "cached"
+@pytest.mark.datafiles(DATA_DIR)
+def test_pull_missing_blob_split_share(cli, tmpdir, datafiles):
+ project = str(datafiles)
- # Now delete blobs in the remote without deleting the artifact ref.
- # This simulates scenarios with concurrent artifact expiry.
- remote_objdir = os.path.join(share.repodir, "cas", "objects")
- shutil.rmtree(remote_objdir)
+ indexshare = os.path.join(str(tmpdir), "indexshare")
+ storageshare = os.path.join(str(tmpdir), "storageshare")
- # Now try bst build
- result = cli.run(project=project, args=["build", "target.bst"])
- result.assert_success()
+ with create_split_share(indexshare, storageshare) as (index, storage):
+ cli.configure(
+ {
+ "artifacts": [
+ {"url": index.repo, "push": True, "type": "index"},
+ {"url": storage.repo, "push": True, "type": "storage"},
+ ]
+ }
+ )
- # Assert that no artifacts were pulled
- assert not result.get_pulled_elements()
+ _test_pull_missing_blob(cli, project, index, storage)
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index e9dfa2c6a..c2f52c514 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -24,10 +24,11 @@
# pylint: disable=redefined-outer-name
import os
+import shutil
import pytest
from buildstream.exceptions import ErrorDomain
-from buildstream.testing import cli # pylint: disable=unused-import
+from buildstream.testing import cli, generate_project # pylint: disable=unused-import
from tests.testutils import (
create_artifact_share,
create_element_size,
@@ -627,3 +628,38 @@ def test_push_no_strict(caplog, cli, tmpdir, datafiles, buildtrees):
args += ["artifact", "push", "--deps", "all", "target.bst"]
result = cli.run(project=project, args=args)
result.assert_success()
+
+
+# Test that push works after rebuilding an incomplete artifact
+# of a non-reproducible element.
+@pytest.mark.datafiles(DATA_DIR)
+def test_push_after_rebuild(cli, tmpdir, datafiles):
+ project = os.path.join(datafiles.dirname, datafiles.basename)
+
+ generate_project(
+ project,
+ config={
+ "element-path": "elements",
+ "plugins": [{"origin": "local", "path": "plugins", "elements": {"randomelement": 0}}],
+ },
+ )
+
+ # First build the element
+ result = cli.run(project=project, args=["build", "random.bst"])
+ result.assert_success()
+ assert cli.get_element_state(project, "random.bst") == "cached"
+
+ # Delete the artifact blobs but keep the artifact proto,
+ # i.e., now we have an incomplete artifact
+ casdir = os.path.join(cli.directory, "cas")
+ shutil.rmtree(casdir)
+ assert cli.get_element_state(project, "random.bst") != "cached"
+
+ with create_artifact_share(os.path.join(str(tmpdir), "artifactshare")) as share:
+ cli.configure({"artifacts": {"url": share.repo, "push": True}})
+
+ # Now rebuild the element and push it
+ result = cli.run(project=project, args=["build", "random.bst"])
+ result.assert_success()
+ assert result.get_pushed_elements() == ["random.bst"]
+ assert cli.get_element_state(project, "random.bst") == "cached"