summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_sourcecache.py213
-rw-r--r--src/buildstream/source.py3
-rw-r--r--tests/frontend/buildtrack.py4
-rw-r--r--tests/sourcecache/cache.py20
-rw-r--r--tests/sourcecache/fetch.py13
-rw-r--r--tests/sourcecache/push.py2
-rw-r--r--tests/sourcecache/staging.py13
7 files changed, 212 insertions, 56 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index ce0694e08..fdfe00901 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -18,12 +18,15 @@
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
+import grpc
-from ._cas import CASRemoteSpec
+from ._cas import CASRemote, CASRemoteSpec
from .storage._casbaseddirectory import CasBasedDirectory
from ._basecache import BaseCache
-from ._exceptions import CASError, CASCacheError, SourceCacheError
+from ._exceptions import CASError, CASRemoteError, SourceCacheError
from . import utils
+from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
+ source_pb2, source_pb2_grpc
# Holds configuration for a remote used for the source cache.
@@ -38,6 +41,43 @@ class SourceCacheSpec(CASRemoteSpec):
pass
+class SourceRemote(CASRemote):
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.capabilities_service = None
+ self.source_service = None
+
+ def init(self):
+ if not self._initialized:
+ super().init()
+
+ self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+
+ # check that the service supports sources
+ try:
+ request = buildstream_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ response = self.capabilities_service.GetCapabilities(request)
+ except grpc.RpcError as e:
+ # Check if this remote has the artifact service
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise SourceCacheError(
+ "Configured remote does not have the BuildStream "
+ "capabilities service. Please check remote configuration.")
+ # Else raise exception with details
+ raise SourceCacheError(
+ "Remote initialisation failed: {}".format(e.details()))
+
+ if not response.source_capabilities:
+ raise SourceCacheError(
+ "Configured remote does not support source service")
+
+ # set up source service
+ self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
+
+
# Class that keeps config of remotes and deals with caching of sources.
#
# Args:
@@ -49,15 +89,20 @@ class SourceCache(BaseCache):
spec_name = "source_cache_specs"
spec_error = SourceCacheError
config_node_name = "source-caches"
+ remote_class = SourceRemote
def __init__(self, context):
super().__init__(context)
self._required_sources = set()
+ self.sourcerefdir = os.path.join(context.cachedir, 'source_protos')
+ os.makedirs(self.sourcerefdir, exist_ok=True)
- self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove)
+ self.casquota.add_remove_callbacks(self.unrequired_sources, self._remove_source)
self.casquota.add_list_refs_callback(self.list_sources)
+ self.cas.add_reachable_directories_callback(self._reachable_directories)
+
# mark_required_sources()
#
# Mark sources that are required by the current run.
@@ -77,8 +122,8 @@ class SourceCache(BaseCache):
for source in sources:
ref = source._get_source_name()
try:
- self.cas.update_mtime(ref)
- except CASCacheError:
+ self._update_mtime(ref)
+ except SourceCacheError:
pass
# required_sources()
@@ -103,23 +148,19 @@ class SourceCache(BaseCache):
def unrequired_sources(self):
required_source_names = set(map(
lambda x: x._get_source_name(), self._required_sources))
- for (mtime, source) in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*"):
+ for (mtime, source) in self._list_refs_mtimes(self.sourcerefdir):
if source not in required_source_names:
yield (mtime, source)
# list_sources()
#
- # Get list of all sources in the `cas/refs/heads/@sources/` folder
+ # Get list of all sources in the `sources_protos/` folder
#
# Returns:
# ([str]): iterable over all source refs
#
def list_sources(self):
- return [ref for _, ref in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*")]
+ return [ref for _, ref in self._list_refs_mtimes(self.sourcerefdir)]
# contains()
#
@@ -134,7 +175,14 @@ class SourceCache(BaseCache):
#
def contains(self, source):
ref = source._get_source_name()
- return self.cas.contains(ref)
+ path = self._source_path(ref)
+
+ if not os.path.exists(path):
+ return False
+
+ # check files
+ source_proto = self._get_source(ref)
+ return self.cas.contains_directory(source_proto.files, with_files=True)
# commit()
#
@@ -162,7 +210,7 @@ class SourceCache(BaseCache):
else:
source._stage(vdir)
- self.cas.set_ref(ref, vdir._get_digest())
+ self._store_source(ref, vdir._get_digest())
# export()
#
@@ -175,13 +223,8 @@ class SourceCache(BaseCache):
# CASBasedDirectory
def export(self, source):
ref = source._get_source_name()
-
- try:
- digest = self.cas.resolve_ref(ref)
- except CASCacheError as e:
- raise SourceCacheError("Error exporting source: {}".format(e))
-
- return CasBasedDirectory(self.cas, digest=digest)
+ source = self._get_source(ref)
+ return CasBasedDirectory(self.cas, digest=source.files)
# pull()
#
@@ -204,13 +247,27 @@ class SourceCache(BaseCache):
try:
source.status("Pulling source {} <- {}".format(display_key, remote.spec.url))
- if self.cas.pull(ref, remote):
- source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
- else:
- source.info("Remote ({}) does not have source {} cached".format(
+ # fetch source proto
+ response = self._pull_source(ref, remote)
+ if response is None:
+ source.info("Remote source service ({}) does not have source {} cached".format(
+ remote.spec.url, display_key))
+ continue
+
+ # Fetch source blobs
+ self.cas._fetch_directory(remote, response.files)
+ required_blobs = self.cas.required_blobs_for_directory(response.files)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
+
+ if missing_blobs:
+ source.info("Remote cas ({}) does not have source {} cached".format(
remote.spec.url, display_key))
+ continue
+
+ source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
+ return True
+
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(
display_key, e)) from e
@@ -242,11 +299,105 @@ class SourceCache(BaseCache):
for remote in push_remotes:
remote.init()
source.status("Pushing source {} -> {}".format(display_key, remote.spec.url))
- if self.cas.push([ref], remote):
- source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
- pushed = True
- else:
+
+ # check whether cache has files already
+ if self._pull_source(ref, remote) is not None:
source.info("Remote ({}) already has source {} cached"
.format(remote.spec.url, display_key))
+ continue
+
+ # push files to storage
+ source_proto = self._get_source(ref)
+ try:
+ self.cas._send_directory(remote, source_proto.files)
+ except CASRemoteError:
+ source.info("Failed to push source files {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ if not self._push_source(ref, remote):
+ source.info("Failed to push source metadata {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
+ pushed = True
return pushed
+
+ def _remove_source(self, ref, *, defer_prune=False):
+ return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune)
+
+ def _store_source(self, ref, digest):
+ source_proto = source_pb2.Source()
+ source_proto.files.CopyFrom(digest)
+
+ self._store_proto(source_proto, ref)
+
+ def _store_proto(self, proto, ref):
+ path = self._source_path(ref)
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with utils.save_file_atomic(path, 'w+b') as f:
+ f.write(proto.SerializeToString())
+
+ def _get_source(self, ref):
+ path = self._source_path(ref)
+ source_proto = source_pb2.Source()
+ try:
+ with open(path, 'r+b') as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+ except FileNotFoundError as e:
+ raise SourceCacheError("Attempted to access unavailable source: {}"
+ .format(e)) from e
+
+ def _source_path(self, ref):
+ return os.path.join(self.sourcerefdir, ref)
+
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.sourcerefdir):
+ for source_file in files:
+ source = source_pb2.Source()
+ with open(os.path.join(root, source_file), 'r+b') as f:
+ source.ParseFromString(f.read())
+
+ yield source.files
+
+ def _update_mtime(self, ref):
+ try:
+ os.utime(self._source_path(ref))
+ except FileNotFoundError as e:
+ raise SourceCacheError("Couldn't find source: {}".format(ref)) from e
+
+ def _pull_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ request = source_pb2.GetSourceRequest()
+ request.cache_key = source_ref
+
+ response = remote.source_service.GetSource(request)
+
+ self._store_proto(response, source_ref)
+
+ return response
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError("Failed to pull source: {}".format(e.details()))
+ return None
+
+ def _push_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ source_proto = self._get_source(source_ref)
+
+ request = source_pb2.UpdateSourceRequest()
+ request.cache_key = source_ref
+ request.source.CopyFrom(source_proto)
+
+ return remote.source_service.UpdateSource(request)
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError("Failed to push source: {}".format(e.details()))
+ return None
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index b5c8f9a63..76c56fd1d 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -1063,8 +1063,7 @@ class Source(Plugin):
# Gives a ref path that points to where sources are kept in the CAS
def _get_source_name(self):
# @ is used to prevent conflicts with project names
- return "{}/{}/{}".format(
- '@sources',
+ return "{}/{}".format(
self.get_kind(),
self._key)
diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py
index d42b6d1ba..13e5ab96e 100644
--- a/tests/frontend/buildtrack.py
+++ b/tests/frontend/buildtrack.py
@@ -140,8 +140,8 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, strict,
# Delete element sources
source_dir = os.path.join(project, 'cache', 'sources')
shutil.rmtree(source_dir)
- source_refs = os.path.join(project, 'cache', 'cas', 'refs', 'heads', '@sources')
- shutil.rmtree(source_refs)
+ source_protos = os.path.join(project, 'cache', 'source_protos')
+ shutil.rmtree(source_protos)
# Delete artifacts one by one and assert element states
for target in set(tracked):
diff --git a/tests/sourcecache/cache.py b/tests/sourcecache/cache.py
index 20faaa64e..793344ef0 100644
--- a/tests/sourcecache/cache.py
+++ b/tests/sourcecache/cache.py
@@ -39,10 +39,10 @@ def test_patch_sources_cached_1(cli, datafiles):
# as we have a local, patch, local config, the first local and patch should
# be cached together, and the last local on it's own
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1
- assert len(os.listdir(os.path.join(source_dir, 'local'))) == 2
+ assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1
+ assert len(os.listdir(os.path.join(source_protos, 'local'))) == 2
@pytest.mark.datafiles(DATA_DIR)
@@ -53,9 +53,9 @@ def test_patch_sources_cached_2(cli, datafiles):
res.assert_success()
# As everything is before the patch it should all be cached together
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1
+ assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1
@pytest.mark.datafiles(DATA_DIR)
@@ -66,9 +66,9 @@ def test_sources_without_patch(cli, datafiles):
res.assert_success()
# No patches so everything should be cached seperately
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'local'))) == 3
+ assert len(os.listdir(os.path.join(source_protos, 'local'))) == 3
@pytest.mark.datafiles(DATA_DIR)
@@ -105,8 +105,8 @@ def test_source_cache_key(cli, datafiles):
res.assert_success()
# Should have one source ref
- patch_refs = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources', 'patch')
- assert len(os.listdir(patch_refs)) == 1
+ patch_protos = os.path.join(project_dir, 'cache', 'source_protos', 'patch')
+ assert len(os.listdir(patch_protos)) == 1
# modify hello-patch file and check tracking updates refs
with open(os.path.join(file_path, 'dev-files', 'usr', 'include', 'pony.h'), 'a') as f:
@@ -120,4 +120,4 @@ def test_source_cache_key(cli, datafiles):
res.assert_success()
# We should have a new source ref
- assert len(os.listdir(patch_refs)) == 2
+ assert len(os.listdir(patch_protos)) == 2
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 3fc9d96a6..de8587862 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -89,15 +89,22 @@ def test_source_fetch(cli, tmpdir, datafiles):
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'sources', 'git')) != []
+ # get root digest of source
+ sourcecache = context.sourcecache
+ digest = sourcecache.export(source)._get_digest()
+
# Move source in local cas to repo
shutil.rmtree(os.path.join(str(tmpdir), 'sourceshare', 'repo', 'cas'))
shutil.move(
+ os.path.join(str(tmpdir), 'cache', 'source_protos'),
+ os.path.join(str(tmpdir), 'sourceshare', 'repo'))
+ shutil.move(
os.path.join(str(tmpdir), 'cache', 'cas'),
os.path.join(str(tmpdir), 'sourceshare', 'repo'))
shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'sources'))
shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'artifacts'))
- digest = share.cas.resolve_ref(source._get_source_name())
+ # check the share has the object
assert share.has_object(digest)
state = cli.get_element_state(project_dir, 'fetch.bst')
@@ -163,7 +170,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
res = cli.run(project=project_dir, args=['source', 'fetch', 'fetch.bst'])
res.assert_success()
brief_key = source._get_brief_display_key()
- assert ("Remote ({}) does not have source {} cached"
+ assert ("Remote source service ({}) does not have source {} cached"
.format(share.repo, brief_key)) in res.stderr
assert ("SUCCESS Fetching from {}"
.format(repo.source_config(ref=ref)['url'])) in res.stderr
@@ -219,5 +226,5 @@ def test_pull_fail(cli, tmpdir, datafiles):
res = cli.run(project=project_dir, args=['build', 'push.bst'])
res.assert_main_error(ErrorDomain.STREAM, None)
res.assert_task_error(ErrorDomain.PLUGIN, None)
- assert "Remote ({}) does not have source {} cached".format(
+ assert "Remote source service ({}) does not have source {} cached".format(
share.repo, source._get_brief_display_key()) in res.stderr
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 6282b6e60..23f5f1ca1 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -95,7 +95,7 @@ def test_source_push(cli, tmpdir, datafiles):
assert sourcecache.contains(source)
# check that's the remote CAS now has it
- digest = share.cas.resolve_ref(source._get_source_name())
+ digest = sourcecache.export(source)._get_digest()
assert share.has_object(digest)
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 9dc431bda..c15bed215 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -78,8 +78,7 @@ def test_source_staged(tmpdir, cli, datafiles):
assert sourcecache.contains(source)
# Extract the file and check it's the same as the one we imported
- ref = source._get_source_name()
- digest = cas.resolve_ref(ref)
+ digest = sourcecache.export(source)._get_digest()
extractdir = os.path.join(str(tmpdir), "extract")
cas.checkout(extractdir, digest)
dir1 = extractdir
@@ -108,6 +107,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
context.cachedir = cachedir
context.messenger.set_message_handler(dummy_message_handler)
cas = context.get_cascache()
+ sourcecache = context.sourcecache
res = cli.run(project=project_dir, args=["source", "fetch", "import-dev.bst"])
res.assert_success()
@@ -117,8 +117,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
assert element._source_cached()
# check that the directory structures are idetical
- ref = source._get_source_name()
- digest = cas.resolve_ref(ref)
+ digest = sourcecache.export(source)._get_digest()
extractdir = os.path.join(str(tmpdir), "extract")
cas.checkout(extractdir, digest)
dir1 = extractdir
@@ -133,11 +132,11 @@ def test_staged_source_build(tmpdir, datafiles, cli):
project_dir = os.path.join(datafiles.dirname, datafiles.basename, 'project')
cachedir = os.path.join(str(tmpdir), 'cache')
element_path = 'elements'
- source_refs = os.path.join(str(tmpdir), 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(str(tmpdir), 'cache', 'source_protos')
source_dir = os.path.join(str(tmpdir), 'cache', 'sources')
cli.configure({
- 'cachedir': os.path.join(str(tmpdir), 'cache')
+ 'cachedir': cachedir
})
create_element_size('target.bst', project_dir, element_path, [], 10000)
@@ -181,7 +180,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
assert files == []
# Now remove the source refs and check the state
- shutil.rmtree(source_refs)
+ shutil.rmtree(source_protos)
cli.remove_artifact_from_cache(project_dir, 'target.bst')
states = cli.get_element_states(project_dir, ['target.bst'])
assert states['target.bst'] == 'fetch needed'