summaryrefslogtreecommitdiff
path: root/src/buildstream/_sourcecache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_sourcecache.py')
-rw-r--r--src/buildstream/_sourcecache.py129
1 files changed, 25 insertions, 104 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index dcde0b426..fd75be34d 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2019 Bloomberg Finance LP
+# Copyright (C) 2019-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -20,98 +20,14 @@
import os
import grpc
-from ._remote import BaseRemote
from ._cas.casremote import BlobNotFound
from .storage._casbaseddirectory import CasBasedDirectory
-from ._basecache import BaseCache
-from ._exceptions import CASError, CASRemoteError, SourceCacheError, RemoteError
+from ._assetcache import AssetCache
+from ._exceptions import CASError, CASRemoteError, SourceCacheError
from . import utils
-from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, source_pb2, source_pb2_grpc
+from ._protos.buildstream.v2 import source_pb2
-
-class SourceRemote(BaseRemote):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.source_service = None
-
- def close(self):
- self.source_service = None
- super().close()
-
- def _configure_protocols(self):
- # set up source service
- self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
-
- # _check():
- #
- # Check if this remote provides everything required for the
- # particular kind of remote. This is expected to be called as part
- # of check()
- #
- # Raises:
- # RemoteError: If the upstream has a problem
- #
- def _check(self):
- capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
-
- # check that the service supports sources
- try:
- request = buildstream_pb2.GetCapabilitiesRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = capabilities_service.GetCapabilities(request)
- except grpc.RpcError as e:
- # Check if this remote has the artifact service
- if e.code() == grpc.StatusCode.UNIMPLEMENTED:
- raise RemoteError(
- "Configured remote does not have the BuildStream "
- "capabilities service. Please check remote configuration."
- )
- raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details()))
-
- if not response.source_capabilities:
- raise RemoteError("Configured remote does not support source service")
-
- if self.spec.push and not response.source_capabilities.allow_updates:
- raise RemoteError("Source server does not allow push")
-
- # get_source():
- #
- # Get a source proto for a given source_ref from the remote.
- #
- # Args:
- # source_ref (str): The source ref of the source to pull.
- #
- # Returns:
- # (Source): The source proto
- #
- # Raises:
- # grpc.RpcError: If something goes wrong during the request.
- #
- def get_source(self, source_ref):
- request = source_pb2.GetSourceRequest()
- request.cache_key = source_ref
- return self.source_service.GetSource(request)
-
- # update_source():
- #
- # Update the source on the remote.
- #
- # Args:
- # source_ref (str): The source ref of the source to update.
- # source (Source): The proto to update with.
- #
- # Returns:
- # (bool): Whether the update was successful.
- #
- # Raises:
- # grpc.RpcError: If something goes wrong during the request.
- #
- def update_source(self, source_ref, source):
- request = source_pb2.UpdateSourceRequest()
- request.cache_key = source_ref
- request.source.CopyFrom(source)
- return self.source_service.UpdateSource(request)
+REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}"
# Class that keeps config of remotes and deals with caching of sources.
@@ -119,12 +35,10 @@ class SourceRemote(BaseRemote):
# Args:
# context (Context): The Buildstream context
#
-class SourceCache(BaseCache):
+class SourceCache(AssetCache):
spec_name = "source_cache_specs"
- spec_error = SourceCacheError
config_node_name = "source-caches"
- index_remote_class = SourceRemote
def __init__(self, context):
super().__init__(context)
@@ -214,15 +128,15 @@ class SourceCache(BaseCache):
index_remotes = self._index_remotes[project]
storage_remotes = self._storage_remotes[project]
- # First fetch the source proto so we know what to pull
- source_proto = None
+ # First fetch the source directory digest so we know what to pull
+ source_digest = None
for remote in index_remotes:
try:
remote.init()
source.status("Pulling source {} <- {}".format(display_key, remote))
- source_proto = self._pull_source(ref, remote)
- if source_proto is None:
+ source_digest = self._pull_source(ref, remote)
+ if source_digest is None:
source.info(
"Remote source service ({}) does not have source {} cached".format(remote, display_key)
)
@@ -230,7 +144,7 @@ class SourceCache(BaseCache):
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(display_key, e)) from e
- if not source_proto:
+ if not source_digest:
return False
for remote in storage_remotes:
@@ -239,8 +153,8 @@ class SourceCache(BaseCache):
source.status("Pulling data for source {} <- {}".format(display_key, remote))
# Fetch source blobs
- self.cas._fetch_directory(remote, source_proto.files)
- required_blobs = self.cas.required_blobs_for_directory(source_proto.files)
+ self.cas._fetch_directory(remote, source_digest)
+ required_blobs = self.cas.required_blobs_for_directory(source_digest)
missing_blobs = self.cas.local_missing_blobs(required_blobs)
self.cas.fetch_blobs(remote, missing_blobs)
@@ -337,11 +251,15 @@ class SourceCache(BaseCache):
return os.path.join(self._basedir, ref)
def _pull_source(self, source_ref, remote):
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
+
try:
remote.init()
- response = remote.get_source(source_ref)
- self._store_proto(response, source_ref)
- return response
+ response = remote.fetch_directory([uri])
+ if not response:
+ return None
+ self._store_source(source_ref, response.root_directory_digest)
+ return response.root_directory_digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
@@ -349,12 +267,15 @@ class SourceCache(BaseCache):
return None
def _push_source(self, source_ref, remote):
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
+
try:
remote.init()
source_proto = self._get_source(source_ref)
- return remote.update_source(source_ref, source_proto)
+ remote.push_directory([uri], source_proto.files)
+ return True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details()))
- return None
+ return False