summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-07-01 12:58:11 +0200
committerbst-marge-bot <marge-bot@buildstream.build>2020-08-13 09:24:43 +0000
commitfce20bc02396071f3e77b64f45f16c8a2049571c (patch)
tree9d4dd422433cef2cbc7b65aee5ac63eee15d9ba6
parentfa255ad0629bd7f5977342ce8e3a7ddb3ac2166f (diff)
downloadbuildstream-fce20bc02396071f3e77b64f45f16c8a2049571c.tar.gz
_sourcecache.py: Use AssetRemote
This migrates the source cache from the BuildStream Source protocol to the Remote Asset API.
-rw-r--r--src/buildstream/_sourcecache.py127
-rw-r--r--tests/testutils/artifactshare.py34
2 files changed, 48 insertions, 113 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index 6ba7ec782..a05344de4 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -1,5 +1,5 @@
#
-# Copyright (C) 2019 Bloomberg Finance LP
+# Copyright (C) 2019-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -20,98 +20,14 @@
import os
import grpc
-from ._remote import BaseRemote
from ._cas.casremote import BlobNotFound
from .storage._casbaseddirectory import CasBasedDirectory
-from ._assetcache import AssetCache
-from ._exceptions import CASError, CASRemoteError, SourceCacheError, RemoteError
+from ._assetcache import AssetCache, AssetRemote
+from ._exceptions import CASError, CASRemoteError, SourceCacheError
from . import utils
-from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, source_pb2, source_pb2_grpc
+from ._protos.buildstream.v2 import source_pb2
-
-class SourceRemote(BaseRemote):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.source_service = None
-
- def close(self):
- self.source_service = None
- super().close()
-
- def _configure_protocols(self):
- # set up source service
- self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
-
- # _check():
- #
- # Check if this remote provides everything required for the
- # particular kind of remote. This is expected to be called as part
- # of check()
- #
- # Raises:
- # RemoteError: If the upstream has a problem
- #
- def _check(self):
- capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
-
- # check that the service supports sources
- try:
- request = buildstream_pb2.GetCapabilitiesRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = capabilities_service.GetCapabilities(request)
- except grpc.RpcError as e:
- # Check if this remote has the artifact service
- if e.code() == grpc.StatusCode.UNIMPLEMENTED:
- raise RemoteError(
- "Configured remote does not have the BuildStream "
- "capabilities service. Please check remote configuration."
- )
- raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details()))
-
- if not response.source_capabilities:
- raise RemoteError("Configured remote does not support source service")
-
- if self.spec.push and not response.source_capabilities.allow_updates:
- raise RemoteError("Source server does not allow push")
-
- # get_source():
- #
- # Get a source proto for a given source_ref from the remote.
- #
- # Args:
- # source_ref (str): The source ref of the source to pull.
- #
- # Returns:
- # (Source): The source proto
- #
- # Raises:
- # grpc.RpcError: If something goes wrong during the request.
- #
- def get_source(self, source_ref):
- request = source_pb2.GetSourceRequest()
- request.cache_key = source_ref
- return self.source_service.GetSource(request)
-
- # update_source():
- #
- # Update the source on the remote.
- #
- # Args:
- # source_ref (str): The source ref of the source to update.
- # source (Source): The proto to update with.
- #
- # Returns:
- # (bool): Whether the update was successful.
- #
- # Raises:
- # grpc.RpcError: If something goes wrong during the request.
- #
- def update_source(self, source_ref, source):
- request = source_pb2.UpdateSourceRequest()
- request.cache_key = source_ref
- request.source.CopyFrom(source)
- return self.source_service.UpdateSource(request)
+REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}"
# Class that keeps config of remotes and deals with caching of sources.
@@ -123,7 +39,7 @@ class SourceCache(AssetCache):
spec_name = "source_cache_specs"
config_node_name = "source-caches"
- index_remote_class = SourceRemote
+ index_remote_class = AssetRemote
def __init__(self, context):
super().__init__(context)
@@ -213,15 +129,15 @@ class SourceCache(AssetCache):
index_remotes = self._index_remotes[project]
storage_remotes = self._storage_remotes[project]
- # First fetch the source proto so we know what to pull
- source_proto = None
+ # First fetch the source directory digest so we know what to pull
+ source_digest = None
for remote in index_remotes:
try:
remote.init()
source.status("Pulling source {} <- {}".format(display_key, remote))
- source_proto = self._pull_source(ref, remote)
- if source_proto is None:
+ source_digest = self._pull_source(ref, remote)
+ if source_digest is None:
source.info(
"Remote source service ({}) does not have source {} cached".format(remote, display_key)
)
@@ -229,7 +145,7 @@ class SourceCache(AssetCache):
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(display_key, e)) from e
- if not source_proto:
+ if not source_digest:
return False
for remote in storage_remotes:
@@ -238,8 +154,8 @@ class SourceCache(AssetCache):
source.status("Pulling data for source {} <- {}".format(display_key, remote))
# Fetch source blobs
- self.cas._fetch_directory(remote, source_proto.files)
- required_blobs = self.cas.required_blobs_for_directory(source_proto.files)
+ self.cas._fetch_directory(remote, source_digest)
+ required_blobs = self.cas.required_blobs_for_directory(source_digest)
missing_blobs = self.cas.local_missing_blobs(required_blobs)
self.cas.fetch_blobs(remote, missing_blobs)
@@ -336,11 +252,15 @@ class SourceCache(AssetCache):
return os.path.join(self._basedir, ref)
def _pull_source(self, source_ref, remote):
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
+
try:
remote.init()
- response = remote.get_source(source_ref)
- self._store_proto(response, source_ref)
- return response
+ response = remote.fetch_directory([uri])
+ if not response:
+ return None
+ self._store_source(source_ref, response.root_directory_digest)
+ return response.root_directory_digest
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
@@ -348,12 +268,15 @@ class SourceCache(AssetCache):
return None
def _push_source(self, source_ref, remote):
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref)
+
try:
remote.init()
source_proto = self._get_source(source_ref)
- return remote.update_source(source_ref, source_proto)
+ remote.push_directory([uri], source_proto.files)
+ return True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details()))
- return None
+ return False
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 07def5c86..bd9c97c61 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -15,10 +15,11 @@ from buildstream._cas.casserver import create_server
from buildstream._exceptions import CASError
from buildstream._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
-from buildstream._protos.buildstream.v2 import artifact_pb2, source_pb2
+from buildstream._protos.buildstream.v2 import artifact_pb2
from buildstream._protos.google.rpc import code_pb2
REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:artifact:{}"
+REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}"
class BaseArtifactShare:
@@ -123,8 +124,6 @@ class ArtifactShare(BaseArtifactShare):
#
self.repodir = os.path.join(self.directory, "repo")
os.makedirs(self.repodir)
- self.sourcedir = os.path.join(self.repodir, "source_protos")
- os.makedirs(self.sourcedir)
logdir = os.path.join(self.directory, "logs") if casd else None
@@ -181,16 +180,29 @@ class ArtifactShare(BaseArtifactShare):
channel.close()
def get_source_proto(self, source_name):
- source_proto = source_pb2.Source()
- source_path = os.path.join(self.sourcedir, source_name)
-
+ url = urlparse(self.repo)
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
try:
- with open(source_path, "rb") as f:
- source_proto.ParseFromString(f.read())
- except FileNotFoundError:
- return None
+ fetch_service = remote_asset_pb2_grpc.FetchStub(channel)
+
+ uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_name)
+
+ request = remote_asset_pb2.FetchDirectoryRequest()
+ request.uris.append(uri)
+
+ try:
+ response = fetch_service.FetchDirectory(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ return None
+ raise
- return source_proto
+ if response.status.code != code_pb2.OK:
+ return None
+
+ return response.root_directory_digest
+ finally:
+ channel.close()
def get_cas_files(self, artifact_proto_digest):