diff options
-rw-r--r-- | src/buildstream/_sourcecache.py | 127 | ||||
-rw-r--r-- | tests/testutils/artifactshare.py | 34 |
2 files changed, 48 insertions, 113 deletions
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index 6ba7ec782..a05344de4 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2019 Bloomberg Finance LP +# Copyright (C) 2019-2020 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -20,98 +20,14 @@ import os import grpc -from ._remote import BaseRemote from ._cas.casremote import BlobNotFound from .storage._casbaseddirectory import CasBasedDirectory -from ._assetcache import AssetCache -from ._exceptions import CASError, CASRemoteError, SourceCacheError, RemoteError +from ._assetcache import AssetCache, AssetRemote +from ._exceptions import CASError, CASRemoteError, SourceCacheError from . import utils -from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, source_pb2, source_pb2_grpc +from ._protos.buildstream.v2 import source_pb2 - -class SourceRemote(BaseRemote): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.source_service = None - - def close(self): - self.source_service = None - super().close() - - def _configure_protocols(self): - # set up source service - self.source_service = source_pb2_grpc.SourceServiceStub(self.channel) - - # _check(): - # - # Check if this remote provides everything required for the - # particular kind of remote. This is expected to be called as part - # of check() - # - # Raises: - # RemoteError: If the upstream has a problem - # - def _check(self): - capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) - - # check that the service supports sources - try: - request = buildstream_pb2.GetCapabilitiesRequest() - if self.instance_name: - request.instance_name = self.instance_name - response = capabilities_service.GetCapabilities(request) - except grpc.RpcError as e: - # Check if this remote has the artifact service - if e.code() == grpc.StatusCode.UNIMPLEMENTED: - raise RemoteError( - "Configured remote does not have the BuildStream " - "capabilities service. Please check remote configuration." - ) - raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details())) - - if not response.source_capabilities: - raise RemoteError("Configured remote does not support source service") - - if self.spec.push and not response.source_capabilities.allow_updates: - raise RemoteError("Source server does not allow push") - - # get_source(): - # - # Get a source proto for a given source_ref from the remote. - # - # Args: - # source_ref (str): The source ref of the source to pull. - # - # Returns: - # (Source): The source proto - # - # Raises: - # grpc.RpcError: If something goes wrong during the request. - # - def get_source(self, source_ref): - request = source_pb2.GetSourceRequest() - request.cache_key = source_ref - return self.source_service.GetSource(request) - - # update_source(): - # - # Update the source on the remote. - # - # Args: - # source_ref (str): The source ref of the source to update. - # source (Source): The proto to update with. - # - # Returns: - # (bool): Whether the update was successful. - # - # Raises: - # grpc.RpcError: If something goes wrong during the request. - # - def update_source(self, source_ref, source): - request = source_pb2.UpdateSourceRequest() - request.cache_key = source_ref - request.source.CopyFrom(source) - return self.source_service.UpdateSource(request) +REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}" # Class that keeps config of remotes and deals with caching of sources. @@ -123,7 +39,7 @@ class SourceCache(AssetCache): spec_name = "source_cache_specs" config_node_name = "source-caches" - index_remote_class = SourceRemote + index_remote_class = AssetRemote def __init__(self, context): super().__init__(context) @@ -213,15 +129,15 @@ class SourceCache(AssetCache): index_remotes = self._index_remotes[project] storage_remotes = self._storage_remotes[project] - # First fetch the source proto so we know what to pull - source_proto = None + # First fetch the source directory digest so we know what to pull + source_digest = None for remote in index_remotes: try: remote.init() source.status("Pulling source {} <- {}".format(display_key, remote)) - source_proto = self._pull_source(ref, remote) - if source_proto is None: + source_digest = self._pull_source(ref, remote) + if source_digest is None: source.info( "Remote source service ({}) does not have source {} cached".format(remote, display_key) ) @@ -229,7 +145,7 @@ class SourceCache(AssetCache): except CASError as e: raise SourceCacheError("Failed to pull source {}: {}".format(display_key, e)) from e - if not source_proto: + if not source_digest: return False for remote in storage_remotes: @@ -238,8 +154,8 @@ class SourceCache(AssetCache): source.status("Pulling data for source {} <- {}".format(display_key, remote)) # Fetch source blobs - self.cas._fetch_directory(remote, source_proto.files) - required_blobs = self.cas.required_blobs_for_directory(source_proto.files) + self.cas._fetch_directory(remote, source_digest) + required_blobs = self.cas.required_blobs_for_directory(source_digest) missing_blobs = self.cas.local_missing_blobs(required_blobs) self.cas.fetch_blobs(remote, missing_blobs) @@ -336,11 +252,15 @@ class SourceCache(AssetCache): return os.path.join(self._basedir, ref) def _pull_source(self, source_ref, remote): + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) + try: remote.init() - response = remote.get_source(source_ref) - self._store_proto(response, source_ref) - return response + response = remote.fetch_directory([uri]) + if not response: + return None + self._store_source(source_ref, response.root_directory_digest) + return response.root_directory_digest except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: @@ -348,12 +268,15 @@ class SourceCache(AssetCache): return None def _push_source(self, source_ref, remote): + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_ref) + try: remote.init() source_proto = self._get_source(source_ref) - return remote.update_source(source_ref, source_proto) + remote.push_directory([uri], source_proto.files) + return True except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: raise SourceCacheError("Failed to push source with status {}: {}".format(e.code().name, e.details())) - return None + return False diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py index 07def5c86..bd9c97c61 100644 --- a/tests/testutils/artifactshare.py +++ b/tests/testutils/artifactshare.py @@ -15,10 +15,11 @@ from buildstream._cas.casserver import create_server from buildstream._exceptions import CASError from buildstream._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 -from buildstream._protos.buildstream.v2 import artifact_pb2, source_pb2 +from buildstream._protos.buildstream.v2 import artifact_pb2 from buildstream._protos.google.rpc import code_pb2 REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:artifact:{}" +REMOTE_ASSET_SOURCE_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:source:{}" class BaseArtifactShare: @@ -123,8 +124,6 @@ class ArtifactShare(BaseArtifactShare): # self.repodir = os.path.join(self.directory, "repo") os.makedirs(self.repodir) - self.sourcedir = os.path.join(self.repodir, "source_protos") - os.makedirs(self.sourcedir) logdir = os.path.join(self.directory, "logs") if casd else None @@ -181,16 +180,29 @@ class ArtifactShare(BaseArtifactShare): channel.close() def get_source_proto(self, source_name): - source_proto = source_pb2.Source() - source_path = os.path.join(self.sourcedir, source_name) - + url = urlparse(self.repo) + channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) try: - with open(source_path, "rb") as f: - source_proto.ParseFromString(f.read()) - except FileNotFoundError: - return None + fetch_service = remote_asset_pb2_grpc.FetchStub(channel) + + uri = REMOTE_ASSET_SOURCE_URN_TEMPLATE.format(source_name) + + request = remote_asset_pb2.FetchDirectoryRequest() + request.uris.append(uri) + + try: + response = fetch_service.FetchDirectory(request) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + return None + raise - return source_proto + if response.status.code != code_pb2.OK: + return None + + return response.root_directory_digest + finally: + channel.close() def get_cas_files(self, artifact_proto_digest): |