summaryrefslogtreecommitdiff
path: root/src/buildstream/_sourcecache.py
diff options
context:
space:
mode:
authorRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2019-06-24 15:56:31 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-08 10:40:28 +0000
commitbb2cf18be0aef7d6e394a0c6ff6d83eac737c60b (patch)
tree2b9714484fe27645211efaa37065220465cf3b79 /src/buildstream/_sourcecache.py
parentd493682609f8f96ed127f4083bad42fa2fabb250 (diff)
downloadbuildstream-bb2cf18be0aef7d6e394a0c6ff6d83eac737c60b.tar.gz
Source cache uses new proto methods
This changes the location of source refs from `cache/cas/refs` to `cache/sources_protos` and fixes tests that looked in the old reference place. Source cache now uses the new protocol buffer and grpc methods defined, for storing locally, and pushing and pulling to remotes. Part of #1038
Diffstat (limited to 'src/buildstream/_sourcecache.py')
-rw-r--r--src/buildstream/_sourcecache.py213
1 files changed, 182 insertions, 31 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index ce0694e08..fdfe00901 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -18,12 +18,15 @@
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
+import grpc
-from ._cas import CASRemoteSpec
+from ._cas import CASRemote, CASRemoteSpec
from .storage._casbaseddirectory import CasBasedDirectory
from ._basecache import BaseCache
-from ._exceptions import CASError, CASCacheError, SourceCacheError
+from ._exceptions import CASError, CASRemoteError, SourceCacheError
from . import utils
+from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
+ source_pb2, source_pb2_grpc
# Holds configuration for a remote used for the source cache.
@@ -38,6 +41,43 @@ class SourceCacheSpec(CASRemoteSpec):
pass
+class SourceRemote(CASRemote):
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.capabilities_service = None
+ self.source_service = None
+
+ def init(self):
+ if not self._initialized:
+ super().init()
+
+ self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+
+ # check that the service supports sources
+ try:
+ request = buildstream_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ response = self.capabilities_service.GetCapabilities(request)
+ except grpc.RpcError as e:
+ # Check if this remote has the artifact service
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise SourceCacheError(
+ "Configured remote does not have the BuildStream "
+ "capabilities service. Please check remote configuration.")
+ # Else raise exception with details
+ raise SourceCacheError(
+ "Remote initialisation failed: {}".format(e.details()))
+
+ if not response.source_capabilities:
+ raise SourceCacheError(
+ "Configured remote does not support source service")
+
+ # set up source service
+ self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
+
+
# Class that keeps config of remotes and deals with caching of sources.
#
# Args:
@@ -49,15 +89,20 @@ class SourceCache(BaseCache):
spec_name = "source_cache_specs"
spec_error = SourceCacheError
config_node_name = "source-caches"
+ remote_class = SourceRemote
def __init__(self, context):
super().__init__(context)
self._required_sources = set()
+ self.sourcerefdir = os.path.join(context.cachedir, 'source_protos')
+ os.makedirs(self.sourcerefdir, exist_ok=True)
- self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove)
+ self.casquota.add_remove_callbacks(self.unrequired_sources, self._remove_source)
self.casquota.add_list_refs_callback(self.list_sources)
+ self.cas.add_reachable_directories_callback(self._reachable_directories)
+
# mark_required_sources()
#
# Mark sources that are required by the current run.
@@ -77,8 +122,8 @@ class SourceCache(BaseCache):
for source in sources:
ref = source._get_source_name()
try:
- self.cas.update_mtime(ref)
- except CASCacheError:
+ self._update_mtime(ref)
+ except SourceCacheError:
pass
# required_sources()
@@ -103,23 +148,19 @@ class SourceCache(BaseCache):
def unrequired_sources(self):
required_source_names = set(map(
lambda x: x._get_source_name(), self._required_sources))
- for (mtime, source) in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*"):
+ for (mtime, source) in self._list_refs_mtimes(self.sourcerefdir):
if source not in required_source_names:
yield (mtime, source)
# list_sources()
#
- # Get list of all sources in the `cas/refs/heads/@sources/` folder
+ # Get list of all sources in the `sources_protos/` folder
#
# Returns:
# ([str]): iterable over all source refs
#
def list_sources(self):
- return [ref for _, ref in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*")]
+ return [ref for _, ref in self._list_refs_mtimes(self.sourcerefdir)]
# contains()
#
@@ -134,7 +175,14 @@ class SourceCache(BaseCache):
#
def contains(self, source):
ref = source._get_source_name()
- return self.cas.contains(ref)
+ path = self._source_path(ref)
+
+ if not os.path.exists(path):
+ return False
+
+ # check files
+ source_proto = self._get_source(ref)
+ return self.cas.contains_directory(source_proto.files, with_files=True)
# commit()
#
@@ -162,7 +210,7 @@ class SourceCache(BaseCache):
else:
source._stage(vdir)
- self.cas.set_ref(ref, vdir._get_digest())
+ self._store_source(ref, vdir._get_digest())
# export()
#
@@ -175,13 +223,8 @@ class SourceCache(BaseCache):
# CASBasedDirectory
def export(self, source):
ref = source._get_source_name()
-
- try:
- digest = self.cas.resolve_ref(ref)
- except CASCacheError as e:
- raise SourceCacheError("Error exporting source: {}".format(e))
-
- return CasBasedDirectory(self.cas, digest=digest)
+ source = self._get_source(ref)
+ return CasBasedDirectory(self.cas, digest=source.files)
# pull()
#
@@ -204,13 +247,27 @@ class SourceCache(BaseCache):
try:
source.status("Pulling source {} <- {}".format(display_key, remote.spec.url))
- if self.cas.pull(ref, remote):
- source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
- else:
- source.info("Remote ({}) does not have source {} cached".format(
+ # fetch source proto
+ response = self._pull_source(ref, remote)
+ if response is None:
+ source.info("Remote source service ({}) does not have source {} cached".format(
+ remote.spec.url, display_key))
+ continue
+
+ # Fetch source blobs
+ self.cas._fetch_directory(remote, response.files)
+ required_blobs = self.cas.required_blobs_for_directory(response.files)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
+
+ if missing_blobs:
+ source.info("Remote cas ({}) does not have source {} cached".format(
remote.spec.url, display_key))
+ continue
+
+ source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
+ return True
+
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(
display_key, e)) from e
@@ -242,11 +299,105 @@ class SourceCache(BaseCache):
for remote in push_remotes:
remote.init()
source.status("Pushing source {} -> {}".format(display_key, remote.spec.url))
- if self.cas.push([ref], remote):
- source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
- pushed = True
- else:
+
+ # check whether cache has files already
+ if self._pull_source(ref, remote) is not None:
source.info("Remote ({}) already has source {} cached"
.format(remote.spec.url, display_key))
+ continue
+
+ # push files to storage
+ source_proto = self._get_source(ref)
+ try:
+ self.cas._send_directory(remote, source_proto.files)
+ except CASRemoteError:
+ source.info("Failed to push source files {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ if not self._push_source(ref, remote):
+ source.info("Failed to push source metadata {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
+ pushed = True
return pushed
+
+ def _remove_source(self, ref, *, defer_prune=False):
+ return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune)
+
+ def _store_source(self, ref, digest):
+ source_proto = source_pb2.Source()
+ source_proto.files.CopyFrom(digest)
+
+ self._store_proto(source_proto, ref)
+
+ def _store_proto(self, proto, ref):
+ path = self._source_path(ref)
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with utils.save_file_atomic(path, 'w+b') as f:
+ f.write(proto.SerializeToString())
+
+ def _get_source(self, ref):
+ path = self._source_path(ref)
+ source_proto = source_pb2.Source()
+ try:
+ with open(path, 'r+b') as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+ except FileNotFoundError as e:
+ raise SourceCacheError("Attempted to access unavailable source: {}"
+ .format(e)) from e
+
+ def _source_path(self, ref):
+ return os.path.join(self.sourcerefdir, ref)
+
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.sourcerefdir):
+ for source_file in files:
+ source = source_pb2.Source()
+ with open(os.path.join(root, source_file), 'r+b') as f:
+ source.ParseFromString(f.read())
+
+ yield source.files
+
+ def _update_mtime(self, ref):
+ try:
+ os.utime(self._source_path(ref))
+ except FileNotFoundError as e:
+ raise SourceCacheError("Couldn't find source: {}".format(ref)) from e
+
+ def _pull_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ request = source_pb2.GetSourceRequest()
+ request.cache_key = source_ref
+
+ response = remote.source_service.GetSource(request)
+
+ self._store_proto(response, source_ref)
+
+ return response
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError("Failed to pull source: {}".format(e.details()))
+ return None
+
+ def _push_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ source_proto = self._get_source(source_ref)
+
+ request = source_pb2.UpdateSourceRequest()
+ request.cache_key = source_ref
+ request.source.CopyFrom(source_proto)
+
+ return remote.source_service.UpdateSource(request)
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError("Failed to push source: {}".format(e.details()))
+ return None