summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2020-06-24 13:06:09 +0200
committerbst-marge-bot <marge-bot@buildstream.build>2020-08-13 09:24:43 +0000
commitfa255ad0629bd7f5977342ce8e3a7ddb3ac2166f (patch)
treea3d1f3cc7d08e97aa835f11686f40e58357735f7
parent91d0298af468394604bdcc039eb811b93d1b39c6 (diff)
downloadbuildstream-fa255ad0629bd7f5977342ce8e3a7ddb3ac2166f.tar.gz
_artifactcache.py: Use AssetRemote
This migrates the artifact cache from the BuildStream Artifact protocol to the Remote Asset API. Co-authored-by: Sander Striker <s.striker@striker.nl>
-rw-r--r--src/buildstream/_artifactcache.py255
-rw-r--r--tests/testutils/artifactshare.py49
2 files changed, 117 insertions, 187 deletions
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index ad70fad94..932db93ff 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -1,6 +1,6 @@
#
# Copyright (C) 2017-2018 Codethink Limited
-# Copyright (C) 2019 Bloomberg Finance LP
+# Copyright (C) 2019-2020 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -21,112 +21,14 @@
import os
import grpc
-from ._assetcache import AssetCache
+from ._assetcache import AssetCache, AssetRemote
from ._cas.casremote import BlobNotFound
-from ._exceptions import ArtifactError, AssetCacheError, CASError, CASRemoteError, RemoteError
-from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, artifact_pb2, artifact_pb2_grpc
+from ._exceptions import ArtifactError, AssetCacheError, CASError, CASRemoteError
+from ._protos.buildstream.v2 import artifact_pb2
-from ._remote import BaseRemote
from . import utils
-
-# ArtifactRemote():
-#
-# Facilitates communication with the BuildStream-specific part of
-# artifact remotes.
-#
-class ArtifactRemote(BaseRemote):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.artifact_service = None
-
- def close(self):
- self.artifact_service = None
- super().close()
-
- # _configure_protocols():
- #
- # Configure the protocols used by this remote as part of the
- # remote initialization; Note that this should only be used in
- # Remote.init(), and is expected to fail when called by itself.
- #
- def _configure_protocols(self):
- # Set up artifact stub
- self.artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel)
-
- # _check():
- #
- # Check if this remote provides everything required for the
- # particular kind of remote. This is expected to be called as part
- # of check()
- #
- # Raises:
- # RemoteError: If the upstream has a problem
- #
- def _check(self):
- capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
-
- # Check whether the server supports newer proto based artifact.
- try:
- request = buildstream_pb2.GetCapabilitiesRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = capabilities_service.GetCapabilities(request)
- except grpc.RpcError as e:
- # Check if this remote has the artifact service
- if e.code() == grpc.StatusCode.UNIMPLEMENTED:
- raise RemoteError(
- "Configured remote does not have the BuildStream "
- "capabilities service. Please check remote configuration."
- )
- # Else raise exception with details
- raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details()))
-
- if not response.artifact_capabilities:
- raise RemoteError("Configured remote does not support artifact service")
-
- if self.spec.push and not response.artifact_capabilities.allow_updates:
- raise RemoteError("Artifact server does not allow push")
-
- # get_artifact():
- #
- # Get an artifact proto for a given cache key from the remote.
- #
- # Args:
- # cache_key (str): The artifact cache key. NOTE: This "key"
- # is actually the ref/name and its name in
- # the protocol is inaccurate. You have been warned.
- #
- # Returns:
- # (Artifact): The artifact proto
- #
- # Raises:
- # grpc.RpcError: If someting goes wrong during the request.
- #
- def get_artifact(self, cache_key):
- artifact_request = artifact_pb2.GetArtifactRequest()
- artifact_request.cache_key = cache_key
-
- return self.artifact_service.GetArtifact(artifact_request)
-
- # update_artifact():
- #
- # Update an artifact with the given cache key on the remote with
- # the given proto.
- #
- # Args:
- # cache_key (str): The artifact cache key of the artifact to update.
- # artifact (ArtifactProto): The artifact proto to send.
- #
- # Raises:
- # grpc.RpcError: If someting goes wrong during the request.
- #
- def update_artifact(self, cache_key, artifact):
- update_request = artifact_pb2.UpdateArtifactRequest()
- update_request.cache_key = cache_key
- update_request.artifact.CopyFrom(artifact)
-
- self.artifact_service.UpdateArtifact(update_request)
+REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:artifact:{}"
# An ArtifactCache manages artifacts.
@@ -138,7 +40,7 @@ class ArtifactCache(AssetCache):
spec_name = "artifact_cache_specs"
config_node_name = "artifacts"
- index_remote_class = ArtifactRemote
+ index_remote_class = AssetRemote
def __init__(self, context):
super().__init__(context)
@@ -225,14 +127,18 @@ class ArtifactCache(AssetCache):
index_remotes = [r for r in self._index_remotes[project] if r.push]
storage_remotes = [r for r in self._storage_remotes[project] if r.push]
+ artifact_proto = artifact._get_proto()
+ artifact_digest = self.cas.add_object(buffer=artifact_proto.SerializeToString())
+
pushed = False
+
# First push our files to all storage remotes, so that they
# can perform file checks on their end
for remote in storage_remotes:
remote.init()
element.status("Pushing data from artifact {} -> {}".format(display_key, remote))
- if self._push_artifact_blobs(artifact, remote):
+ if self._push_artifact_blobs(artifact, artifact_digest, remote):
element.info("Pushed data from artifact {} -> {}".format(display_key, remote))
else:
element.info(
@@ -245,7 +151,7 @@ class ArtifactCache(AssetCache):
remote.init()
element.status("Pushing artifact {} -> {}".format(display_key, remote))
- if self._push_artifact_proto(element, artifact, remote):
+ if self._push_artifact_proto(element, artifact, artifact_digest, remote):
element.info("Pushed artifact {} -> {}".format(display_key, remote))
pushed = True
else:
@@ -268,10 +174,13 @@ class ArtifactCache(AssetCache):
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, pull_buildtrees=False):
- artifact = None
+ artifact_digest = None
display_key = key[: self.context.log_key_length]
project = element._get_project()
+ artifact_name = element.get_artifact_name(key=key)
+ uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name)
+
errors = []
# Start by pulling our artifact proto, so that we know which
# blobs to pull
@@ -279,23 +188,24 @@ class ArtifactCache(AssetCache):
remote.init()
try:
element.status("Pulling artifact {} <- {}".format(display_key, remote))
- artifact = self._pull_artifact_proto(element, key, remote)
- if artifact:
+ response = remote.fetch_blob([uri])
+ if response:
+ artifact_digest = response.blob_digest
break
element.info("Remote ({}) does not have artifact {} cached".format(remote, display_key))
- except CASError as e:
+ except AssetCacheError as e:
element.warn("Could not pull from remote {}: {}".format(remote, e))
errors.append(e)
- if errors and not artifact:
+ if errors and not artifact_digest:
raise ArtifactError(
"Failed to pull artifact {}".format(display_key), detail="\n".join(str(e) for e in errors)
)
# If we don't have an artifact, we can't exactly pull our
# artifact
- if not artifact:
+ if not artifact_digest:
return False
errors = []
@@ -305,7 +215,7 @@ class ArtifactCache(AssetCache):
try:
element.status("Pulling data for artifact {} <- {}".format(display_key, remote))
- if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees):
+ if self._pull_artifact_storage(element, key, artifact_digest, remote, pull_buildtrees=pull_buildtrees):
element.info("Pulled artifact {} <- {}".format(display_key, remote))
return True
@@ -483,7 +393,7 @@ class ArtifactCache(AssetCache):
# ArtifactError: If we fail to push blobs (*unless* they're
# already there or we run out of space on the server).
#
- def _push_artifact_blobs(self, artifact, remote):
+ def _push_artifact_blobs(self, artifact, artifact_digest, remote):
artifact_proto = artifact._get_proto()
try:
@@ -496,7 +406,8 @@ class ArtifactCache(AssetCache):
except FileNotFoundError:
pass
- digests = []
+ digests = [artifact_digest]
+
if str(artifact_proto.public_data):
digests.append(artifact_proto.public_data)
@@ -525,7 +436,7 @@ class ArtifactCache(AssetCache):
# Args:
# element (Element): The element
# artifact (Artifact): The related artifact being pushed
- # remote (ArtifactRemote): Remote to push to
+ # remote (AssetRemote): Remote to push to
#
# Returns:
# (bool): Whether we pushed the artifact.
@@ -534,33 +445,46 @@ class ArtifactCache(AssetCache):
# ArtifactError: If the push fails for any reason except the
# artifact already existing.
#
- def _push_artifact_proto(self, element, artifact, remote):
+ def _push_artifact_proto(self, element, artifact, artifact_digest, remote):
artifact_proto = artifact._get_proto()
keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
+ artifact_names = [element.get_artifact_name(key=key) for key in keys]
+ uris = [REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name) for artifact_name in artifact_names]
- pushed = False
+ try:
+ response = remote.fetch_blob(uris)
+ # Skip push if artifact is already on the server
+ if response and response.blob_digest == artifact_digest:
+ return False
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError(
+ "Error checking artifact cache with status {}: {}".format(e.code().name, e.details())
+ )
- for key in keys:
- try:
- remote_artifact = remote.get_artifact(element.get_artifact_name(key=key))
- # Skip push if artifact is already on the server
- if remote_artifact == artifact_proto:
- continue
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError(
- "Error checking artifact cache with status {}: {}".format(e.code().name, e.details())
- )
+ referenced_directories = []
+ if artifact_proto.files:
+ referenced_directories.append(artifact_proto.files)
+ if artifact_proto.buildtree:
+ referenced_directories.append(artifact_proto.buildtree)
+ if artifact_proto.sources:
+ referenced_directories.append(artifact_proto.sources)
- try:
- remote.update_artifact(element.get_artifact_name(key=key), artifact_proto)
- pushed = True
- except grpc.RpcError as e:
- raise ArtifactError("Failed to push artifact with status {}: {}".format(e.code().name, e.details()))
+ referenced_blobs = [log_file.digest for log_file in artifact_proto.logs]
- return pushed
+ try:
+ remote.push_blob(
+ uris,
+ artifact_digest,
+ references_blobs=referenced_blobs,
+ references_directories=referenced_directories,
+ )
+ except grpc.RpcError as e:
+ raise ArtifactError("Failed to push artifact with status {}: {}".format(e.code().name, e.details()))
+
+ return True
# _pull_artifact_storage():
#
@@ -579,7 +503,7 @@ class ArtifactCache(AssetCache):
# ArtifactError: If the pull failed for any reason except the
# blobs not existing on the server.
#
- def _pull_artifact_storage(self, element, artifact, remote, pull_buildtrees=False):
+ def _pull_artifact_storage(self, element, key, artifact_digest, remote, pull_buildtrees=False):
def __pull_digest(digest):
self.cas._fetch_directory(remote, digest)
required_blobs = self.cas.required_blobs_for_directory(digest)
@@ -587,7 +511,21 @@ class ArtifactCache(AssetCache):
if missing_blobs:
self.cas.fetch_blobs(remote, missing_blobs)
+ artifact_name = element.get_artifact_name(key=key)
+
try:
+ # Fetch and parse artifact proto
+ self.cas.fetch_blobs(remote, [artifact_digest])
+ artifact = artifact_pb2.Artifact()
+ with open(self.cas.objpath(artifact_digest), "rb") as f:
+ artifact.ParseFromString(f.read())
+
+ # Write the artifact proto to cache
+ artifact_path = os.path.join(self._basedir, artifact_name)
+ os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
+ with utils.save_file_atomic(artifact_path, mode="wb") as f:
+ f.write(artifact.SerializeToString())
+
if str(artifact.files):
__pull_digest(artifact.files)
@@ -609,57 +547,22 @@ class ArtifactCache(AssetCache):
return True
- # _pull_artifact_proto():
- #
- # Pull an artifact proto from a remote server.
- #
- # Args:
- # element (Element): The element whose artifact to pull.
- # key (str): The specific key for the artifact to pull.
- # remote (ArtifactRemote): The remote to pull from.
- #
- # Returns:
- # (Artifact|None): The artifact proto, or None if the server
- # doesn't have it.
- #
- # Raises:
- # ArtifactError: If the pull fails.
- #
- def _pull_artifact_proto(self, element, key, remote):
- artifact_name = element.get_artifact_name(key=key)
-
- try:
- artifact = remote.get_artifact(artifact_name)
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError("Failed to pull artifact with status {}: {}".format(e.code().name, e.details()))
- return None
-
- # Write the artifact proto to cache
- artifact_path = os.path.join(self._basedir, artifact_name)
- os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
- with utils.save_file_atomic(artifact_path, mode="wb") as f:
- f.write(artifact.SerializeToString())
-
- return artifact
-
# _query_remote()
#
# Args:
# ref (str): The artifact ref
- # remote (ArtifactRemote): The remote we want to check
+ # remote (AssetRemote): The remote we want to check
#
# Returns:
# (bool): True if the ref exists in the remote, False otherwise.
#
def _query_remote(self, ref, remote):
- request = artifact_pb2.GetArtifactRequest()
- request.cache_key = ref
+ uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(ref)
+
try:
- remote.artifact_service.GetArtifact(request)
+ response = remote.fetch_blob([uri])
+ return bool(response)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise ArtifactError("Error when querying with status {}: {}".format(e.code().name, e.details()))
return False
-
- return True
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index e471d7989..07def5c86 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -6,14 +6,19 @@ from collections import namedtuple
from contextlib import ExitStack, contextmanager
from concurrent import futures
from multiprocessing import Process, Queue
+from urllib.parse import urlparse
import grpc
from buildstream._cas import CASCache
from buildstream._cas.casserver import create_server
from buildstream._exceptions import CASError
+from buildstream._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream._protos.buildstream.v2 import artifact_pb2, source_pb2
+from buildstream._protos.google.rpc import code_pb2
+
+REMOTE_ASSET_ARTIFACT_URN_TEMPLATE = "urn:fdc:buildstream.build:2020:artifact:{}"
class BaseArtifactShare:
@@ -118,8 +123,6 @@ class ArtifactShare(BaseArtifactShare):
#
self.repodir = os.path.join(self.directory, "repo")
os.makedirs(self.repodir)
- self.artifactdir = os.path.join(self.repodir, "artifacts", "refs")
- os.makedirs(self.artifactdir)
self.sourcedir = os.path.join(self.repodir, "source_protos")
os.makedirs(self.sourcedir)
@@ -153,16 +156,29 @@ class ArtifactShare(BaseArtifactShare):
return os.path.exists(object_path)
def get_artifact_proto(self, artifact_name):
- artifact_proto = artifact_pb2.Artifact()
- artifact_path = os.path.join(self.artifactdir, artifact_name)
-
+ url = urlparse(self.repo)
+ channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
try:
- with open(artifact_path, "rb") as f:
- artifact_proto.ParseFromString(f.read())
- except FileNotFoundError:
- return None
+ fetch_service = remote_asset_pb2_grpc.FetchStub(channel)
+
+ uri = REMOTE_ASSET_ARTIFACT_URN_TEMPLATE.format(artifact_name)
+
+ request = remote_asset_pb2.FetchBlobRequest()
+ request.uris.append(uri)
+
+ try:
+ response = fetch_service.FetchBlob(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ return None
+ raise
+
+ if response.status.code != code_pb2.OK:
+ return None
- return artifact_proto
+ return response.blob_digest
+ finally:
+ channel.close()
def get_source_proto(self, source_name):
source_proto = source_pb2.Source()
@@ -176,7 +192,7 @@ class ArtifactShare(BaseArtifactShare):
return source_proto
- def get_cas_files(self, artifact_proto):
+ def get_cas_files(self, artifact_proto_digest):
reachable = set()
@@ -184,6 +200,17 @@ class ArtifactShare(BaseArtifactShare):
self.cas._reachable_refs_dir(reachable, digest, update_mtime=False, check_exists=True)
try:
+ artifact_proto_path = self.cas.objpath(artifact_proto_digest)
+ if not os.path.exists(artifact_proto_path):
+ return None
+
+ artifact_proto = artifact_pb2.Artifact()
+ try:
+ with open(artifact_proto_path, "rb") as f:
+ artifact_proto.ParseFromString(f.read())
+ except FileNotFoundError:
+ return None
+
if str(artifact_proto.files):
reachable_dir(artifact_proto.files)