summaryrefslogtreecommitdiff
path: root/src/buildstream/_artifactcache.py
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2019-08-22 17:48:34 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2019-09-06 15:55:10 +0100
commit47a3f93d9795be6af849c112d4180f0ad50ca23b (patch)
tree2d65dd2c24d9d6bd6795f0680811cf95ae3803e4 /src/buildstream/_artifactcache.py
parente71621510de7c55aae4855f8bbb64eb2755346a8 (diff)
downloadbuildstream-47a3f93d9795be6af849c112d4180f0ad50ca23b.tar.gz
Allow splitting artifact caches
This is now split into storage/index remotes, where the former is expected to be a CASRemote and the latter a BuildStream-specific remote with the extensions required to store BuildStream artifact protos.
Diffstat (limited to 'src/buildstream/_artifactcache.py')
-rw-r--r--src/buildstream/_artifactcache.py373
1 files changed, 266 insertions, 107 deletions
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index 73047d376..0e2eb1091 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -25,48 +25,89 @@ from ._exceptions import ArtifactError, CASError, CASCacheError, CASRemoteError
from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
artifact_pb2, artifact_pb2_grpc
-from ._cas import CASRemote
+from ._remote import BaseRemote
from .storage._casbaseddirectory import CasBasedDirectory
from ._artifact import Artifact
from . import utils
+# ArtifactRemote():
+#
+# Facilitates communication with the BuildStream-specific part of
+# artifact remotes.
+#
+class ArtifactRemote(BaseRemote):
+ # _configure_protocols():
+ #
+ # Configure the protocols used by this remote as part of the
+ # remote initialization; Note that this should only be used in
+ # Remote.init(), and is expected to fail when called by itself.
+ #
+ def _configure_protocols(self):
+ # Add artifact stub
+ capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+ # Check whether the server supports newer proto based artifact.
+ try:
+ request = buildstream_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ response = capabilities_service.GetCapabilities(request)
+ except grpc.RpcError as e:
+ # Check if this remote has the artifact service
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise ArtifactError(
+ "Configured remote does not have the BuildStream "
+ "capabilities service. Please check remote configuration.")
+ # Else raise exception with details
+ raise ArtifactError(
+ "Remote initialisation failed: {}".format(e.details()))
-# ArtifactRemote extends CASRemote to check during initialisation that there is
-# an artifact service
-class ArtifactRemote(CASRemote):
- def __init__(self, *args):
- super().__init__(*args)
- self.capabilities_service = None
+ if not response.artifact_capabilities:
+ raise ArtifactError(
+ "Configured remote does not support artifact service")
- def init(self):
- if not self._initialized:
- # do default initialisation
- super().init()
+ # get_artifact():
+ #
+ # Get an artifact proto for a given cache key from the remote.
+ #
+ # Args:
+ # cache_key (str): The artifact cache key. NOTE: This "key"
+ # is actually the ref/name and its name in
+ # the protocol is inaccurate. You have been warned.
+ #
+ # Returns:
+ # (Artifact): The artifact proto
+ #
+ # Raises:
+ # grpc.RpcError: If someting goes wrong during the request.
+ #
+ def get_artifact(self, cache_key):
+ artifact_request = artifact_pb2.GetArtifactRequest()
+ artifact_request.cache_key = cache_key
- # Add artifact stub
- self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel)
+ return artifact_service.GetArtifact(artifact_request)
- # Check whether the server supports newer proto based artifact.
- try:
- request = buildstream_pb2.GetCapabilitiesRequest()
- if self.instance_name:
- request.instance_name = self.instance_name
- response = self.capabilities_service.GetCapabilities(request)
- except grpc.RpcError as e:
- # Check if this remote has the artifact service
- if e.code() == grpc.StatusCode.UNIMPLEMENTED:
- raise ArtifactError(
- "Configured remote does not have the BuildStream "
- "capabilities service. Please check remote configuration.")
- # Else raise exception with details
- raise ArtifactError(
- "Remote initialisation failed: {}".format(e.details()))
+ # update_artifact():
+ #
+ # Update an artifact with the given cache key on the remote with
+ # the given proto.
+ #
+ # Args:
+ # cache_key (str): The artifact cache key of the artifact to update.
+ # artifact (ArtifactProto): The artifact proto to send.
+ #
+ # Raises:
+ # grpc.RpcError: If someting goes wrong during the request.
+ #
+ def update_artifact(self, cache_key, artifact):
+ update_request = artifact_pb2.UpdateArtifactRequest()
+ update_request.cache_key = cache_key
+ update_request.artifact.CopyFrom(artifact)
- if not response.artifact_capabilities:
- raise ArtifactError(
- "Configured remote does not support artifact service")
+ artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel)
+ artifact_service.UpdateArtifact(update_request)
# An ArtifactCache manages artifacts.
@@ -79,7 +120,7 @@ class ArtifactCache(BaseCache):
spec_name = "artifact_cache_specs"
spec_error = ArtifactError
config_node_name = "artifacts"
- remote_class = ArtifactRemote
+ index_remote_class = ArtifactRemote
def __init__(self, context):
super().__init__(context)
@@ -187,22 +228,35 @@ class ArtifactCache(BaseCache):
#
def push(self, element, artifact):
project = element._get_project()
+ display_key = element._get_brief_display_key()
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
+ index_remotes = [r for r in self._index_remotes[project] if r.push]
+ storage_remotes = [r for r in self._storage_remotes[project] if r.push]
pushed = False
+ # First push our files to all storage remotes, so that they
+ # can perform file checks on their end
+ for remote in storage_remotes:
+ remote.init()
+ element.status("Pushing data from artifact {} -> {}".format(display_key, remote))
- for remote in push_remotes:
+ if self._push_artifact_blobs(artifact, remote):
+ element.info("Pushed data from artifact {} -> {}".format(display_key, remote))
+ else:
+ element.info("Remote ({}) already has all data of artifact {} cached".format(
+ remote, element._get_brief_display_key()
+ ))
+
+ for remote in index_remotes:
remote.init()
- display_key = element._get_brief_display_key()
- element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
+ element.status("Pushing artifact {} -> {}".format(display_key, remote))
- if self._push_artifact(element, artifact, remote):
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
+ if self._push_artifact_proto(element, artifact, remote):
+ element.info("Pushed artifact {} -> {}".format(display_key, remote))
pushed = True
else:
element.info("Remote ({}) already has artifact {} cached".format(
- remote.spec.url, element._get_brief_display_key()
+ remote, element._get_brief_display_key()
))
return pushed
@@ -220,26 +274,59 @@ class ArtifactCache(BaseCache):
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, pull_buildtrees=False):
+ artifact = None
display_key = key[:self.context.log_key_length]
project = element._get_project()
- for remote in self._remotes[project]:
+ errors = []
+ # Start by pulling our artifact proto, so that we know which
+ # blobs to pull
+ for remote in self._index_remotes[project]:
remote.init()
try:
- element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
-
- if self._pull_artifact(element, key, remote, pull_buildtrees=pull_buildtrees):
- element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
+ element.status("Pulling artifact {} <- {}".format(display_key, remote))
+ artifact = self._pull_artifact_proto(element, key, remote)
+ if artifact:
+ element.info("Pulled artifact {} <- {}".format(display_key, remote))
+ break
else:
element.info("Remote ({}) does not have artifact {} cached".format(
- remote.spec.url, display_key
+ remote, display_key
))
+ except CASError as e:
+ element.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors and not artifact:
+ raise ArtifactError("Failed to pull artifact {}".format(display_key),
+ detail="\n".join(str(e) for e in errors))
+
+ # If we don't have an artifact, we can't exactly pull our
+ # artifact
+ if not artifact:
+ return False
+
+ errors = []
+ # If we do, we can pull it!
+ for remote in self._storage_remotes[project]:
+ remote.init()
+ try:
+ element.status("Pulling data for artifact {} <- {}".format(display_key, remote))
+ if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees):
+ element.info("Pulled data for artifact {} <- {}".format(display_key, remote))
+ return True
+
+ element.info("Remote ({}) does not have artifact {} cached".format(
+ remote, display_key
+ ))
except CASError as e:
- raise ArtifactError("Failed to pull artifact {}: {}".format(
- display_key, e)) from e
+ element.warn("Could not pull from remote {}: {}".format(remote, e))
+ errors.append(e)
+
+ if errors:
+ raise ArtifactError("Failed to pull artifact {}".format(display_key),
+ detail="\n".join(str(e) for e in errors))
return False
@@ -253,7 +340,7 @@ class ArtifactCache(BaseCache):
# digest (Digest): The digest of the tree
#
def pull_tree(self, project, digest):
- for remote in self._remotes[project]:
+ for remote in self._storage_remotes[project]:
digest = self.cas.pull_tree(remote, digest)
if digest:
@@ -276,7 +363,7 @@ class ArtifactCache(BaseCache):
def push_message(self, project, message):
if self._has_push_remotes:
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
+ push_remotes = [r for r in self._storage_remotes[project] if r.spec.push]
else:
push_remotes = []
@@ -330,7 +417,7 @@ class ArtifactCache(BaseCache):
# missing_blobs (list): The Digests of the blobs to fetch
#
def fetch_missing_blobs(self, project, missing_blobs):
- for remote in self._remotes[project]:
+ for remote in self._index_remotes[project]:
if not missing_blobs:
break
@@ -357,7 +444,7 @@ class ArtifactCache(BaseCache):
if not missing_blobs:
return []
- push_remotes = [r for r in self._remotes[project] if r.spec.push]
+ push_remotes = [r for r in self._storage_remotes[project] if r.spec.push]
remote_missing_blobs_list = []
@@ -384,12 +471,12 @@ class ArtifactCache(BaseCache):
#
def check_remotes_for_element(self, element):
# If there are no remotes
- if not self._remotes:
+ if not self._index_remotes:
return False
project = element._get_project()
ref = element.get_artifact_name()
- for remote in self._remotes[project]:
+ for remote in self._index_remotes[project]:
remote.init()
if self._query_remote(ref, remote):
@@ -401,40 +488,59 @@ class ArtifactCache(BaseCache):
# Local Private Methods #
################################################
- # _push_artifact()
+ # _reachable_directories()
#
- # Pushes relevant directories and then artifact proto to remote.
+ # Returns:
+ # (iter): Iterator over directories digests available from artifacts.
#
- # Args:
- # element (Element): The element
- # artifact (Artifact): The related artifact being pushed
- # remote (CASRemote): Remote to push to
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
+
+ if str(artifact.files):
+ yield artifact.files
+
+ if str(artifact.buildtree):
+ yield artifact.buildtree
+
+ # _reachable_digests()
#
# Returns:
- # (bool): whether the push was successful
+ # (iter): Iterator over single file digests in artifacts
#
- def _push_artifact(self, element, artifact, remote):
+ def _reachable_digests(self):
+ for root, _, files in os.walk(self.artifactdir):
+ for artifact_file in files:
+ artifact = artifact_pb2.Artifact()
+ with open(os.path.join(root, artifact_file), 'r+b') as f:
+ artifact.ParseFromString(f.read())
- artifact_proto = artifact._get_proto()
+ if str(artifact.public_data):
+ yield artifact.public_data
- keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
+ for log_file in artifact.logs:
+ yield log_file.digest
- # Check whether the artifact is on the server
- present = False
- for key in keys:
- get_artifact = artifact_pb2.GetArtifactRequest()
- get_artifact.cache_key = element.get_artifact_name(key)
- try:
- artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
- artifact_service.GetArtifact(get_artifact)
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError("Error checking artifact cache: {}"
- .format(e.details()))
- else:
- present = True
- if present:
- return False
+ # _push_artifact_blobs()
+ #
+ # Push the blobs that make up an artifact to the remote server.
+ #
+ # Args:
+ # artifact (Artifact): The artifact whose blobs to push.
+ # remote (CASRemote): The remote to push the blobs to.
+ #
+ # Returns:
+ # (bool) - True if we uploaded anything, False otherwise.
+ #
+ # Raises:
+ # ArtifactError: If we fail to push blobs (*unless* they're
+ # already there or we run out of space on the server).
+ #
+ def _push_artifact_blobs(self, artifact, remote):
+ artifact_proto = artifact._get_proto()
try:
self.cas._send_directory(remote, artifact_proto.files)
@@ -463,33 +569,68 @@ class ArtifactCache(BaseCache):
raise ArtifactError("Failed to push artifact blobs: {}".format(e.details()))
return False
- # finally need to send the artifact proto
+ return True
+
+ # _push_artifact_proto()
+ #
+ # Pushes the artifact proto to remote.
+ #
+ # Args:
+ # element (Element): The element
+ # artifact (Artifact): The related artifact being pushed
+ # remote (ArtifactRemote): Remote to push to
+ #
+ # Returns:
+ # (bool): Whether we pushed the artifact.
+ #
+ # Raises:
+ # ArtifactError: If the push fails for any reason except the
+ # artifact already existing.
+ #
+ def _push_artifact_proto(self, element, artifact, remote):
+
+ artifact_proto = artifact._get_proto()
+
+ keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
+
+ # Check whether the artifact is on the server
for key in keys:
- update_artifact = artifact_pb2.UpdateArtifactRequest()
- update_artifact.cache_key = element.get_artifact_name(key)
- update_artifact.artifact.CopyFrom(artifact_proto)
+ try:
+ remote.get_artifact(element.get_artifact_name(key=key))
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Error checking artifact cache: {}"
+ .format(e.details()))
+ else:
+ return False
+ # If not, we send the artifact proto
+ for key in keys:
try:
- artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
- artifact_service.UpdateArtifact(update_artifact)
+ remote.update_artifact(element.get_artifact_name(key=key), artifact_proto)
except grpc.RpcError as e:
raise ArtifactError("Failed to push artifact: {}".format(e.details()))
return True
- # _pull_artifact()
+ # _pull_artifact_storage():
+ #
+ # Pull artifact blobs from the given remote.
#
# Args:
- # element (Element): element to pull
- # key (str): specific key of element to pull
- # remote (CASRemote): remote to pull from
- # pull_buildtree (bool): whether to pull buildtrees or not
+ # element (Element): element to pull
+ # key (str): The specific key for the artifact to pull
+ # remote (CASRemote): remote to pull from
+ # pull_buildtree (bool): whether to pull buildtrees or not
#
# Returns:
- # (bool): whether the pull was successful
+ # (bool): True if we pulled any blobs.
#
- def _pull_artifact(self, element, key, remote, pull_buildtrees=False):
-
+ # Raises:
+ # ArtifactError: If the pull failed for any reason except the
+ # blobs not existing on the server.
+ #
+ def _pull_artifact_storage(self, element, artifact, remote, pull_buildtrees=False):
def __pull_digest(digest):
self.cas._fetch_directory(remote, digest)
required_blobs = self.cas.required_blobs_for_directory(digest)
@@ -497,16 +638,6 @@ class ArtifactCache(BaseCache):
if missing_blobs:
self.cas.fetch_blobs(remote, missing_blobs)
- request = artifact_pb2.GetArtifactRequest()
- request.cache_key = element.get_artifact_name(key=key)
- try:
- artifact_service = artifact_pb2_grpc.ArtifactServiceStub(remote.channel)
- artifact = artifact_service.GetArtifact(request)
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
- return False
-
try:
if str(artifact.files):
__pull_digest(artifact.files)
@@ -527,13 +658,41 @@ class ArtifactCache(BaseCache):
raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
return False
+ return True
+
+ # _pull_artifact_proto():
+ #
+ # Pull an artifact proto from a remote server.
+ #
+ # Args:
+ # element (Element): The element whose artifact to pull.
+ # key (str): The specific key for the artifact to pull.
+ # remote (ArtifactRemote): The remote to pull from.
+ #
+ # Returns:
+ # (Artifact|None): The artifact proto, or None if the server
+ # doesn't have it.
+ #
+ # Raises:
+ # ArtifactError: If the pull fails.
+ #
+ def _pull_artifact_proto(self, element, key, remote):
+ artifact_name = element.get_artifact_name(key=key)
+
+ try:
+ artifact = remote.get_artifact(artifact_name)
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
+ return None
+
# Write the artifact proto to cache
- artifact_path = os.path.join(self.artifactdir, request.cache_key)
+ artifact_path = os.path.join(self.artifactdir, artifact_name)
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
with utils.save_file_atomic(artifact_path, mode='wb') as f:
f.write(artifact.SerializeToString())
- return True
+ return artifact
# _query_remote()
#