summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2019-06-24 15:56:31 +0100
committerRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2019-07-05 15:44:29 +0100
commitb15b32376f6abe1735233bd5d85c42fd1ad5a703 (patch)
tree7c8de493e75f7e5be9837c772df93aef28d7158a
parent46418bf79a740fb6c906962e52c243243d2849bb (diff)
downloadbuildstream-raoul/1038-source-cache-proto.tar.gz
Source cache uses new proto methodsraoul/1038-source-cache-proto
This changes the location of source refs from `cache/cas/refs` to `cache/sources_protos` and fixes tests that looked in the old reference place. Source cache now uses the new protocol buffer and grpc methods defined, for storing locally, and pushing and pulling to remotes. Part of #1038
-rw-r--r--src/buildstream/_sourcecache.py213
-rw-r--r--src/buildstream/source.py3
-rw-r--r--tests/frontend/buildtrack.py4
-rw-r--r--tests/sourcecache/cache.py20
-rw-r--r--tests/sourcecache/fetch.py13
-rw-r--r--tests/sourcecache/push.py2
-rw-r--r--tests/sourcecache/staging.py13
7 files changed, 212 insertions, 56 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index ce0694e08..fdfe00901 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -18,12 +18,15 @@
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
+import grpc
-from ._cas import CASRemoteSpec
+from ._cas import CASRemote, CASRemoteSpec
from .storage._casbaseddirectory import CasBasedDirectory
from ._basecache import BaseCache
-from ._exceptions import CASError, CASCacheError, SourceCacheError
+from ._exceptions import CASError, CASRemoteError, SourceCacheError
from . import utils
+from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
+ source_pb2, source_pb2_grpc
# Holds configuration for a remote used for the source cache.
@@ -38,6 +41,43 @@ class SourceCacheSpec(CASRemoteSpec):
pass
+class SourceRemote(CASRemote):
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.capabilities_service = None
+ self.source_service = None
+
+ def init(self):
+ if not self._initialized:
+ super().init()
+
+ self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+
+ # check that the service supports sources
+ try:
+ request = buildstream_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ response = self.capabilities_service.GetCapabilities(request)
+ except grpc.RpcError as e:
+ # Check if this remote has the artifact service
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise SourceCacheError(
+ "Configured remote does not have the BuildStream "
+ "capabilities service. Please check remote configuration.")
+ # Else raise exception with details
+ raise SourceCacheError(
+ "Remote initialisation failed: {}".format(e.details()))
+
+ if not response.source_capabilities:
+ raise SourceCacheError(
+ "Configured remote does not support source service")
+
+ # set up source service
+ self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
+
+
# Class that keeps config of remotes and deals with caching of sources.
#
# Args:
@@ -49,15 +89,20 @@ class SourceCache(BaseCache):
spec_name = "source_cache_specs"
spec_error = SourceCacheError
config_node_name = "source-caches"
+ remote_class = SourceRemote
def __init__(self, context):
super().__init__(context)
self._required_sources = set()
+ self.sourcerefdir = os.path.join(context.cachedir, 'source_protos')
+ os.makedirs(self.sourcerefdir, exist_ok=True)
- self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove)
+ self.casquota.add_remove_callbacks(self.unrequired_sources, self._remove_source)
self.casquota.add_list_refs_callback(self.list_sources)
+ self.cas.add_reachable_directories_callback(self._reachable_directories)
+
# mark_required_sources()
#
# Mark sources that are required by the current run.
@@ -77,8 +122,8 @@ class SourceCache(BaseCache):
for source in sources:
ref = source._get_source_name()
try:
- self.cas.update_mtime(ref)
- except CASCacheError:
+ self._update_mtime(ref)
+ except SourceCacheError:
pass
# required_sources()
@@ -103,23 +148,19 @@ class SourceCache(BaseCache):
def unrequired_sources(self):
required_source_names = set(map(
lambda x: x._get_source_name(), self._required_sources))
- for (mtime, source) in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*"):
+ for (mtime, source) in self._list_refs_mtimes(self.sourcerefdir):
if source not in required_source_names:
yield (mtime, source)
# list_sources()
#
- # Get list of all sources in the `cas/refs/heads/@sources/` folder
+ # Get list of all sources in the `sources_protos/` folder
#
# Returns:
# ([str]): iterable over all source refs
#
def list_sources(self):
- return [ref for _, ref in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*")]
+ return [ref for _, ref in self._list_refs_mtimes(self.sourcerefdir)]
# contains()
#
@@ -134,7 +175,14 @@ class SourceCache(BaseCache):
#
def contains(self, source):
ref = source._get_source_name()
- return self.cas.contains(ref)
+ path = self._source_path(ref)
+
+ if not os.path.exists(path):
+ return False
+
+ # check files
+ source_proto = self._get_source(ref)
+ return self.cas.contains_directory(source_proto.files, with_files=True)
# commit()
#
@@ -162,7 +210,7 @@ class SourceCache(BaseCache):
else:
source._stage(vdir)
- self.cas.set_ref(ref, vdir._get_digest())
+ self._store_source(ref, vdir._get_digest())
# export()
#
@@ -175,13 +223,8 @@ class SourceCache(BaseCache):
# CASBasedDirectory
def export(self, source):
ref = source._get_source_name()
-
- try:
- digest = self.cas.resolve_ref(ref)
- except CASCacheError as e:
- raise SourceCacheError("Error exporting source: {}".format(e))
-
- return CasBasedDirectory(self.cas, digest=digest)
+ source = self._get_source(ref)
+ return CasBasedDirectory(self.cas, digest=source.files)
# pull()
#
@@ -204,13 +247,27 @@ class SourceCache(BaseCache):
try:
source.status("Pulling source {} <- {}".format(display_key, remote.spec.url))
- if self.cas.pull(ref, remote):
- source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
- else:
- source.info("Remote ({}) does not have source {} cached".format(
+ # fetch source proto
+ response = self._pull_source(ref, remote)
+ if response is None:
+ source.info("Remote source service ({}) does not have source {} cached".format(
+ remote.spec.url, display_key))
+ continue
+
+ # Fetch source blobs
+ self.cas._fetch_directory(remote, response.files)
+ required_blobs = self.cas.required_blobs_for_directory(response.files)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
+
+ if missing_blobs:
+ source.info("Remote cas ({}) does not have source {} cached".format(
remote.spec.url, display_key))
+ continue
+
+ source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
+ return True
+
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(
display_key, e)) from e
@@ -242,11 +299,105 @@ class SourceCache(BaseCache):
for remote in push_remotes:
remote.init()
source.status("Pushing source {} -> {}".format(display_key, remote.spec.url))
- if self.cas.push([ref], remote):
- source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
- pushed = True
- else:
+
+ # check whether cache has files already
+ if self._pull_source(ref, remote) is not None:
source.info("Remote ({}) already has source {} cached"
.format(remote.spec.url, display_key))
+ continue
+
+ # push files to storage
+ source_proto = self._get_source(ref)
+ try:
+ self.cas._send_directory(remote, source_proto.files)
+ except CASRemoteError:
+ source.info("Failed to push source files {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ if not self._push_source(ref, remote):
+ source.info("Failed to push source metadata {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
+ pushed = True
return pushed
+
+ def _remove_source(self, ref, *, defer_prune=False):
+ return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune)
+
+ def _store_source(self, ref, digest):
+ source_proto = source_pb2.Source()
+ source_proto.files.CopyFrom(digest)
+
+ self._store_proto(source_proto, ref)
+
+ def _store_proto(self, proto, ref):
+ path = self._source_path(ref)
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with utils.save_file_atomic(path, 'w+b') as f:
+ f.write(proto.SerializeToString())
+
+ def _get_source(self, ref):
+ path = self._source_path(ref)
+ source_proto = source_pb2.Source()
+ try:
+ with open(path, 'r+b') as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+ except FileNotFoundError as e:
+ raise SourceCacheError("Attempted to access unavailable source: {}"
+ .format(e)) from e
+
+ def _source_path(self, ref):
+ return os.path.join(self.sourcerefdir, ref)
+
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.sourcerefdir):
+ for source_file in files:
+ source = source_pb2.Source()
+ with open(os.path.join(root, source_file), 'r+b') as f:
+ source.ParseFromString(f.read())
+
+ yield source.files
+
+ def _update_mtime(self, ref):
+ try:
+ os.utime(self._source_path(ref))
+ except FileNotFoundError as e:
+ raise SourceCacheError("Couldn't find source: {}".format(ref)) from e
+
+ def _pull_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ request = source_pb2.GetSourceRequest()
+ request.cache_key = source_ref
+
+ response = remote.source_service.GetSource(request)
+
+ self._store_proto(response, source_ref)
+
+ return response
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError("Failed to pull source: {}".format(e.details()))
+ return None
+
+ def _push_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ source_proto = self._get_source(source_ref)
+
+ request = source_pb2.UpdateSourceRequest()
+ request.cache_key = source_ref
+ request.source.CopyFrom(source_proto)
+
+ return remote.source_service.UpdateSource(request)
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError("Failed to push source: {}".format(e.details()))
+ return None
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index b5c8f9a63..76c56fd1d 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -1063,8 +1063,7 @@ class Source(Plugin):
# Gives a ref path that points to where sources are kept in the CAS
def _get_source_name(self):
# @ is used to prevent conflicts with project names
- return "{}/{}/{}".format(
- '@sources',
+ return "{}/{}".format(
self.get_kind(),
self._key)
diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py
index d42b6d1ba..13e5ab96e 100644
--- a/tests/frontend/buildtrack.py
+++ b/tests/frontend/buildtrack.py
@@ -140,8 +140,8 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, strict,
# Delete element sources
source_dir = os.path.join(project, 'cache', 'sources')
shutil.rmtree(source_dir)
- source_refs = os.path.join(project, 'cache', 'cas', 'refs', 'heads', '@sources')
- shutil.rmtree(source_refs)
+ source_protos = os.path.join(project, 'cache', 'source_protos')
+ shutil.rmtree(source_protos)
# Delete artifacts one by one and assert element states
for target in set(tracked):
diff --git a/tests/sourcecache/cache.py b/tests/sourcecache/cache.py
index 20faaa64e..793344ef0 100644
--- a/tests/sourcecache/cache.py
+++ b/tests/sourcecache/cache.py
@@ -39,10 +39,10 @@ def test_patch_sources_cached_1(cli, datafiles):
# as we have a local, patch, local config, the first local and patch should
# be cached together, and the last local on it's own
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1
- assert len(os.listdir(os.path.join(source_dir, 'local'))) == 2
+ assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1
+ assert len(os.listdir(os.path.join(source_protos, 'local'))) == 2
@pytest.mark.datafiles(DATA_DIR)
@@ -53,9 +53,9 @@ def test_patch_sources_cached_2(cli, datafiles):
res.assert_success()
# As everything is before the patch it should all be cached together
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1
+ assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1
@pytest.mark.datafiles(DATA_DIR)
@@ -66,9 +66,9 @@ def test_sources_without_patch(cli, datafiles):
res.assert_success()
# No patches so everything should be cached seperately
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'local'))) == 3
+ assert len(os.listdir(os.path.join(source_protos, 'local'))) == 3
@pytest.mark.datafiles(DATA_DIR)
@@ -105,8 +105,8 @@ def test_source_cache_key(cli, datafiles):
res.assert_success()
# Should have one source ref
- patch_refs = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources', 'patch')
- assert len(os.listdir(patch_refs)) == 1
+ patch_protos = os.path.join(project_dir, 'cache', 'source_protos', 'patch')
+ assert len(os.listdir(patch_protos)) == 1
# modify hello-patch file and check tracking updates refs
with open(os.path.join(file_path, 'dev-files', 'usr', 'include', 'pony.h'), 'a') as f:
@@ -120,4 +120,4 @@ def test_source_cache_key(cli, datafiles):
res.assert_success()
# We should have a new source ref
- assert len(os.listdir(patch_refs)) == 2
+ assert len(os.listdir(patch_protos)) == 2
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 3fc9d96a6..de8587862 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -89,15 +89,22 @@ def test_source_fetch(cli, tmpdir, datafiles):
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'sources', 'git')) != []
+ # get root digest of source
+ sourcecache = context.sourcecache
+ digest = sourcecache.export(source)._get_digest()
+
# Move source in local cas to repo
shutil.rmtree(os.path.join(str(tmpdir), 'sourceshare', 'repo', 'cas'))
shutil.move(
+ os.path.join(str(tmpdir), 'cache', 'source_protos'),
+ os.path.join(str(tmpdir), 'sourceshare', 'repo'))
+ shutil.move(
os.path.join(str(tmpdir), 'cache', 'cas'),
os.path.join(str(tmpdir), 'sourceshare', 'repo'))
shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'sources'))
shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'artifacts'))
- digest = share.cas.resolve_ref(source._get_source_name())
+ # check the share has the object
assert share.has_object(digest)
state = cli.get_element_state(project_dir, 'fetch.bst')
@@ -163,7 +170,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
res = cli.run(project=project_dir, args=['source', 'fetch', 'fetch.bst'])
res.assert_success()
brief_key = source._get_brief_display_key()
- assert ("Remote ({}) does not have source {} cached"
+ assert ("Remote source service ({}) does not have source {} cached"
.format(share.repo, brief_key)) in res.stderr
assert ("SUCCESS Fetching from {}"
.format(repo.source_config(ref=ref)['url'])) in res.stderr
@@ -219,5 +226,5 @@ def test_pull_fail(cli, tmpdir, datafiles):
res = cli.run(project=project_dir, args=['build', 'push.bst'])
res.assert_main_error(ErrorDomain.STREAM, None)
res.assert_task_error(ErrorDomain.PLUGIN, None)
- assert "Remote ({}) does not have source {} cached".format(
+ assert "Remote source service ({}) does not have source {} cached".format(
share.repo, source._get_brief_display_key()) in res.stderr
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 6282b6e60..23f5f1ca1 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -95,7 +95,7 @@ def test_source_push(cli, tmpdir, datafiles):
assert sourcecache.contains(source)
# check that's the remote CAS now has it
- digest = share.cas.resolve_ref(source._get_source_name())
+ digest = sourcecache.export(source)._get_digest()
assert share.has_object(digest)
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 9dc431bda..c15bed215 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -78,8 +78,7 @@ def test_source_staged(tmpdir, cli, datafiles):
assert sourcecache.contains(source)
# Extract the file and check it's the same as the one we imported
- ref = source._get_source_name()
- digest = cas.resolve_ref(ref)
+ digest = sourcecache.export(source)._get_digest()
extractdir = os.path.join(str(tmpdir), "extract")
cas.checkout(extractdir, digest)
dir1 = extractdir
@@ -108,6 +107,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
context.cachedir = cachedir
context.messenger.set_message_handler(dummy_message_handler)
cas = context.get_cascache()
+ sourcecache = context.sourcecache
res = cli.run(project=project_dir, args=["source", "fetch", "import-dev.bst"])
res.assert_success()
@@ -117,8 +117,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
assert element._source_cached()
# check that the directory structures are idetical
- ref = source._get_source_name()
- digest = cas.resolve_ref(ref)
+ digest = sourcecache.export(source)._get_digest()
extractdir = os.path.join(str(tmpdir), "extract")
cas.checkout(extractdir, digest)
dir1 = extractdir
@@ -133,11 +132,11 @@ def test_staged_source_build(tmpdir, datafiles, cli):
project_dir = os.path.join(datafiles.dirname, datafiles.basename, 'project')
cachedir = os.path.join(str(tmpdir), 'cache')
element_path = 'elements'
- source_refs = os.path.join(str(tmpdir), 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(str(tmpdir), 'cache', 'source_protos')
source_dir = os.path.join(str(tmpdir), 'cache', 'sources')
cli.configure({
- 'cachedir': os.path.join(str(tmpdir), 'cache')
+ 'cachedir': cachedir
})
create_element_size('target.bst', project_dir, element_path, [], 10000)
@@ -181,7 +180,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
assert files == []
# Now remove the source refs and check the state
- shutil.rmtree(source_refs)
+ shutil.rmtree(source_protos)
cli.remove_artifact_from_cache(project_dir, 'target.bst')
states = cli.get_element_states(project_dir, ['target.bst'])
assert states['target.bst'] == 'fetch needed'