diff options
-rw-r--r-- | src/buildstream/_sourcecache.py | 213 | ||||
-rw-r--r-- | src/buildstream/source.py | 3 | ||||
-rw-r--r-- | tests/frontend/buildtrack.py | 4 | ||||
-rw-r--r-- | tests/sourcecache/cache.py | 20 | ||||
-rw-r--r-- | tests/sourcecache/fetch.py | 13 | ||||
-rw-r--r-- | tests/sourcecache/push.py | 2 | ||||
-rw-r--r-- | tests/sourcecache/staging.py | 13 |
7 files changed, 212 insertions, 56 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index ce0694e08..fdfe00901 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -18,12 +18,15 @@ # Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> # import os +import grpc -from ._cas import CASRemoteSpec +from ._cas import CASRemote, CASRemoteSpec from .storage._casbaseddirectory import CasBasedDirectory from ._basecache import BaseCache -from ._exceptions import CASError, CASCacheError, SourceCacheError +from ._exceptions import CASError, CASRemoteError, SourceCacheError from . import utils +from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ + source_pb2, source_pb2_grpc # Holds configuration for a remote used for the source cache. @@ -38,6 +41,43 @@ class SourceCacheSpec(CASRemoteSpec): pass +class SourceRemote(CASRemote): + def __init__(self, *args): + super().__init__(*args) + self.capabilities_service = None + self.source_service = None + + def init(self): + if not self._initialized: + super().init() + + self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) + + # check that the service supports sources + try: + request = buildstream_pb2.GetCapabilitiesRequest() + if self.instance_name: + request.instance_name = self.instance_name + + response = self.capabilities_service.GetCapabilities(request) + except grpc.RpcError as e: + # Check if this remote has the artifact service + if e.code() == grpc.StatusCode.UNIMPLEMENTED: + raise SourceCacheError( + "Configured remote does not have the BuildStream " + "capabilities service. Please check remote configuration.") + # Else raise exception with details + raise SourceCacheError( + "Remote initialisation failed: {}".format(e.details())) + + if not response.source_capabilities: + raise SourceCacheError( + "Configured remote does not support source service") + + # set up source service + self.source_service = source_pb2_grpc.SourceServiceStub(self.channel) + + # Class that keeps config of remotes and deals with caching of sources. # # Args: @@ -49,15 +89,20 @@ class SourceCache(BaseCache): spec_name = "source_cache_specs" spec_error = SourceCacheError config_node_name = "source-caches" + remote_class = SourceRemote def __init__(self, context): super().__init__(context) self._required_sources = set() + self.sourcerefdir = os.path.join(context.cachedir, 'source_protos') + os.makedirs(self.sourcerefdir, exist_ok=True) - self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove) + self.casquota.add_remove_callbacks(self.unrequired_sources, self._remove_source) self.casquota.add_list_refs_callback(self.list_sources) + self.cas.add_reachable_directories_callback(self._reachable_directories) + # mark_required_sources() # # Mark sources that are required by the current run. @@ -77,8 +122,8 @@ class SourceCache(BaseCache): for source in sources: ref = source._get_source_name() try: - self.cas.update_mtime(ref) - except CASCacheError: + self._update_mtime(ref) + except SourceCacheError: pass # required_sources() @@ -103,23 +148,19 @@ class SourceCache(BaseCache): def unrequired_sources(self): required_source_names = set(map( lambda x: x._get_source_name(), self._required_sources)) - for (mtime, source) in self._list_refs_mtimes( - os.path.join(self.cas.casdir, 'refs', 'heads'), - glob_expr="@sources/*"): + for (mtime, source) in self._list_refs_mtimes(self.sourcerefdir): if source not in required_source_names: yield (mtime, source) # list_sources() # - # Get list of all sources in the `cas/refs/heads/@sources/` folder + # Get list of all sources in the `sources_protos/` folder # # Returns: # ([str]): iterable over all source refs # def list_sources(self): - return [ref for _, ref in self._list_refs_mtimes( - os.path.join(self.cas.casdir, 'refs', 'heads'), - glob_expr="@sources/*")] + return [ref for _, ref in self._list_refs_mtimes(self.sourcerefdir)] # contains() # @@ -134,7 +175,14 @@ class SourceCache(BaseCache): # def contains(self, source): ref = source._get_source_name() - return self.cas.contains(ref) + path = self._source_path(ref) + + if not os.path.exists(path): + return False + + # check files + source_proto = self._get_source(ref) + return self.cas.contains_directory(source_proto.files, with_files=True) # commit() # @@ -162,7 +210,7 @@ class SourceCache(BaseCache): else: source._stage(vdir) - self.cas.set_ref(ref, vdir._get_digest()) + self._store_source(ref, vdir._get_digest()) # export() # @@ -175,13 +223,8 @@ class SourceCache(BaseCache): # CASBasedDirectory def export(self, source): ref = source._get_source_name() - - try: - digest = self.cas.resolve_ref(ref) - except CASCacheError as e: - raise SourceCacheError("Error exporting source: {}".format(e)) - - return CasBasedDirectory(self.cas, digest=digest) + source = self._get_source(ref) + return CasBasedDirectory(self.cas, digest=source.files) # pull() # @@ -204,13 +247,27 @@ class SourceCache(BaseCache): try: source.status("Pulling source {} <- {}".format(display_key, remote.spec.url)) - if self.cas.pull(ref, remote): - source.info("Pulled source {} <- {}".format(display_key, remote.spec.url)) - # no need to pull from additional remotes - return True - else: - source.info("Remote ({}) does not have source {} cached".format( + # fetch source proto + response = self._pull_source(ref, remote) + if response is None: + source.info("Remote source service ({}) does not have source {} cached".format( + remote.spec.url, display_key)) + continue + + # Fetch source blobs + self.cas._fetch_directory(remote, response.files) + required_blobs = self.cas.required_blobs_for_directory(response.files) + missing_blobs = self.cas.local_missing_blobs(required_blobs) + missing_blobs = self.cas.fetch_blobs(remote, missing_blobs) + + if missing_blobs: + source.info("Remote cas ({}) does not have source {} cached".format( remote.spec.url, display_key)) + continue + + source.info("Pulled source {} <- {}".format(display_key, remote.spec.url)) + return True + except CASError as e: raise SourceCacheError("Failed to pull source {}: {}".format( display_key, e)) from e @@ -242,11 +299,105 @@ class SourceCache(BaseCache): for remote in push_remotes: remote.init() source.status("Pushing source {} -> {}".format(display_key, remote.spec.url)) - if self.cas.push([ref], remote): - source.info("Pushed source {} -> {}".format(display_key, remote.spec.url)) - pushed = True - else: + + # check whether cache has files already + if self._pull_source(ref, remote) is not None: source.info("Remote ({}) already has source {} cached" .format(remote.spec.url, display_key)) + continue + + # push files to storage + source_proto = self._get_source(ref) + try: + self.cas._send_directory(remote, source_proto.files) + except CASRemoteError: + source.info("Failed to push source files {} -> {}".format(display_key, remote.spec.url)) + continue + + if not self._push_source(ref, remote): + source.info("Failed to push source metadata {} -> {}".format(display_key, remote.spec.url)) + continue + + source.info("Pushed source {} -> {}".format(display_key, remote.spec.url)) + pushed = True return pushed + + def _remove_source(self, ref, *, defer_prune=False): + return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune) + + def _store_source(self, ref, digest): + source_proto = source_pb2.Source() + source_proto.files.CopyFrom(digest) + + self._store_proto(source_proto, ref) + + def _store_proto(self, proto, ref): + path = self._source_path(ref) + os.makedirs(os.path.dirname(path), exist_ok=True) + with utils.save_file_atomic(path, 'w+b') as f: + f.write(proto.SerializeToString()) + + def _get_source(self, ref): + path = self._source_path(ref) + source_proto = source_pb2.Source() + try: + with open(path, 'r+b') as f: + source_proto.ParseFromString(f.read()) + return source_proto + except FileNotFoundError as e: + raise SourceCacheError("Attempted to access unavailable source: {}" + .format(e)) from e + + def _source_path(self, ref): + return os.path.join(self.sourcerefdir, ref) + + def _reachable_directories(self): + for root, _, files in os.walk(self.sourcerefdir): + for source_file in files: + source = source_pb2.Source() + with open(os.path.join(root, source_file), 'r+b') as f: + source.ParseFromString(f.read()) + + yield source.files + + def _update_mtime(self, ref): + try: + os.utime(self._source_path(ref)) + except FileNotFoundError as e: + raise SourceCacheError("Couldn't find source: {}".format(ref)) from e + + def _pull_source(self, source_ref, remote): + try: + remote.init() + + request = source_pb2.GetSourceRequest() + request.cache_key = source_ref + + response = remote.source_service.GetSource(request) + + self._store_proto(response, source_ref) + + return response + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SourceCacheError("Failed to pull source: {}".format(e.details())) + return None + + def _push_source(self, source_ref, remote): + try: + remote.init() + + source_proto = self._get_source(source_ref) + + request = source_pb2.UpdateSourceRequest() + request.cache_key = source_ref + request.source.CopyFrom(source_proto) + + return remote.source_service.UpdateSource(request) + + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + raise SourceCacheError("Failed to push source: {}".format(e.details())) + return None diff --git a/src/buildstream/source.py b/src/buildstream/source.py index b5c8f9a63..76c56fd1d 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -1063,8 +1063,7 @@ class Source(Plugin): # Gives a ref path that points to where sources are kept in the CAS def _get_source_name(self): # @ is used to prevent conflicts with project names - return "{}/{}/{}".format( - '@sources', + return "{}/{}".format( self.get_kind(), self._key) diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py index d42b6d1ba..13e5ab96e 100644 --- a/tests/frontend/buildtrack.py +++ b/tests/frontend/buildtrack.py @@ -140,8 +140,8 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, strict, # Delete element sources source_dir = os.path.join(project, 'cache', 'sources') shutil.rmtree(source_dir) - source_refs = os.path.join(project, 'cache', 'cas', 'refs', 'heads', '@sources') - shutil.rmtree(source_refs) + source_protos = os.path.join(project, 'cache', 'source_protos') + shutil.rmtree(source_protos) # Delete artifacts one by one and assert element states for target in set(tracked): diff --git a/tests/sourcecache/cache.py b/tests/sourcecache/cache.py index 20faaa64e..793344ef0 100644 --- a/tests/sourcecache/cache.py +++ b/tests/sourcecache/cache.py @@ -39,10 +39,10 @@ def test_patch_sources_cached_1(cli, datafiles): # as we have a local, patch, local config, the first local and patch should # be cached together, and the last local on it's own - source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources') + source_protos = os.path.join(project_dir, 'cache', 'source_protos') - assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1 - assert len(os.listdir(os.path.join(source_dir, 'local'))) == 2 + assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1 + assert len(os.listdir(os.path.join(source_protos, 'local'))) == 2 @pytest.mark.datafiles(DATA_DIR) @@ -53,9 +53,9 @@ def test_patch_sources_cached_2(cli, datafiles): res.assert_success() # As everything is before the patch it should all be cached together - source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources') + source_protos = os.path.join(project_dir, 'cache', 'source_protos') - assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1 + assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1 @pytest.mark.datafiles(DATA_DIR) @@ -66,9 +66,9 @@ def test_sources_without_patch(cli, datafiles): res.assert_success() # No patches so everything should be cached seperately - source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources') + source_protos = os.path.join(project_dir, 'cache', 'source_protos') - assert len(os.listdir(os.path.join(source_dir, 'local'))) == 3 + assert len(os.listdir(os.path.join(source_protos, 'local'))) == 3 @pytest.mark.datafiles(DATA_DIR) @@ -105,8 +105,8 @@ def test_source_cache_key(cli, datafiles): res.assert_success() # Should have one source ref - patch_refs = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources', 'patch') - assert len(os.listdir(patch_refs)) == 1 + patch_protos = os.path.join(project_dir, 'cache', 'source_protos', 'patch') + assert len(os.listdir(patch_protos)) == 1 # modify hello-patch file and check tracking updates refs with open(os.path.join(file_path, 'dev-files', 'usr', 'include', 'pony.h'), 'a') as f: @@ -120,4 +120,4 @@ def test_source_cache_key(cli, datafiles): res.assert_success() # We should have a new source ref - assert len(os.listdir(patch_refs)) == 2 + assert len(os.listdir(patch_protos)) == 2 diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py index 3fc9d96a6..de8587862 100644 --- a/tests/sourcecache/fetch.py +++ b/tests/sourcecache/fetch.py @@ -89,15 +89,22 @@ def test_source_fetch(cli, tmpdir, datafiles): assert os.listdir(os.path.join(str(tmpdir), 'cache', 'sources', 'git')) != [] + # get root digest of source + sourcecache = context.sourcecache + digest = sourcecache.export(source)._get_digest() + # Move source in local cas to repo shutil.rmtree(os.path.join(str(tmpdir), 'sourceshare', 'repo', 'cas')) shutil.move( + os.path.join(str(tmpdir), 'cache', 'source_protos'), + os.path.join(str(tmpdir), 'sourceshare', 'repo')) + shutil.move( os.path.join(str(tmpdir), 'cache', 'cas'), os.path.join(str(tmpdir), 'sourceshare', 'repo')) shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'sources')) shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'artifacts')) - digest = share.cas.resolve_ref(source._get_source_name()) + # check the share has the object assert share.has_object(digest) state = cli.get_element_state(project_dir, 'fetch.bst') @@ -163,7 +170,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles): res = cli.run(project=project_dir, args=['source', 'fetch', 'fetch.bst']) res.assert_success() brief_key = source._get_brief_display_key() - assert ("Remote ({}) does not have source {} cached" + assert ("Remote source service ({}) does not have source {} cached" .format(share.repo, brief_key)) in res.stderr assert ("SUCCESS Fetching from {}" .format(repo.source_config(ref=ref)['url'])) in res.stderr @@ -219,5 +226,5 @@ def test_pull_fail(cli, tmpdir, datafiles): res = cli.run(project=project_dir, args=['build', 'push.bst']) res.assert_main_error(ErrorDomain.STREAM, None) res.assert_task_error(ErrorDomain.PLUGIN, None) - assert "Remote ({}) does not have source {} cached".format( + assert "Remote source service ({}) does not have source {} cached".format( share.repo, source._get_brief_display_key()) in res.stderr diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py index 6282b6e60..23f5f1ca1 100644 --- a/tests/sourcecache/push.py +++ b/tests/sourcecache/push.py @@ -95,7 +95,7 @@ def test_source_push(cli, tmpdir, datafiles): assert sourcecache.contains(source) # check that's the remote CAS now has it - digest = share.cas.resolve_ref(source._get_source_name()) + digest = sourcecache.export(source)._get_digest() assert share.has_object(digest) diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py index 9dc431bda..c15bed215 100644 --- a/tests/sourcecache/staging.py +++ b/tests/sourcecache/staging.py @@ -78,8 +78,7 @@ def test_source_staged(tmpdir, cli, datafiles): assert sourcecache.contains(source) # Extract the file and check it's the same as the one we imported - ref = source._get_source_name() - digest = cas.resolve_ref(ref) + digest = sourcecache.export(source)._get_digest() extractdir = os.path.join(str(tmpdir), "extract") cas.checkout(extractdir, digest) dir1 = extractdir @@ -108,6 +107,7 @@ def test_source_fetch(tmpdir, cli, datafiles): context.cachedir = cachedir context.messenger.set_message_handler(dummy_message_handler) cas = context.get_cascache() + sourcecache = context.sourcecache res = cli.run(project=project_dir, args=["source", "fetch", "import-dev.bst"]) res.assert_success() @@ -117,8 +117,7 @@ def test_source_fetch(tmpdir, cli, datafiles): assert element._source_cached() # check that the directory structures are idetical - ref = source._get_source_name() - digest = cas.resolve_ref(ref) + digest = sourcecache.export(source)._get_digest() extractdir = os.path.join(str(tmpdir), "extract") cas.checkout(extractdir, digest) dir1 = extractdir @@ -133,11 +132,11 @@ def test_staged_source_build(tmpdir, datafiles, cli): project_dir = os.path.join(datafiles.dirname, datafiles.basename, 'project') cachedir = os.path.join(str(tmpdir), 'cache') element_path = 'elements' - source_refs = os.path.join(str(tmpdir), 'cache', 'cas', 'refs', 'heads', '@sources') + source_protos = os.path.join(str(tmpdir), 'cache', 'source_protos') source_dir = os.path.join(str(tmpdir), 'cache', 'sources') cli.configure({ - 'cachedir': os.path.join(str(tmpdir), 'cache') + 'cachedir': cachedir }) create_element_size('target.bst', project_dir, element_path, [], 10000) @@ -181,7 +180,7 @@ def test_staged_source_build(tmpdir, datafiles, cli): assert files == [] # Now remove the source refs and check the state - shutil.rmtree(source_refs) + shutil.rmtree(source_protos) cli.remove_artifact_from_cache(project_dir, 'target.bst') states = cli.get_element_states(project_dir, ['target.bst']) assert states['target.bst'] == 'fetch needed' |