#
# Copyright (C) 2017-2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
#
# Authors:
# Tristan Maat
import os
import grpc
from ._basecache import BaseCache
from ._exceptions import ArtifactError, CASError, CASCacheError, CASRemoteError
from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
artifact_pb2, artifact_pb2_grpc
from ._remote import BaseRemote
from .storage._casbaseddirectory import CasBasedDirectory
from ._artifact import Artifact
from . import utils
# ArtifactRemote():
#
# Facilitates communication with the BuildStream-specific part of
# artifact remotes.
#
class ArtifactRemote(BaseRemote):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.artifact_service = None
def close(self):
self.artifact_service = None
super().close()
# _configure_protocols():
#
# Configure the protocols used by this remote as part of the
# remote initialization; Note that this should only be used in
# Remote.init(), and is expected to fail when called by itself.
#
def _configure_protocols(self):
# Set up artifact stub
self.artifact_service = artifact_pb2_grpc.ArtifactServiceStub(self.channel)
# _check():
#
# Check if this remote provides everything required for the
# particular kind of remote. This is expected to be called as part
# of check(), and must be called in a non-main process.
#
# Returns:
# (str|None): An error message, or None if no error message.
#
def _check(self):
capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
# Check whether the server supports newer proto based artifact.
try:
request = buildstream_pb2.GetCapabilitiesRequest()
if self.instance_name:
request.instance_name = self.instance_name
response = capabilities_service.GetCapabilities(request)
except grpc.RpcError as e:
# Check if this remote has the artifact service
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
return ("Configured remote does not have the BuildStream "
"capabilities service. Please check remote configuration.")
# Else raise exception with details
return "Remote initialisation failed: {}".format(e.details())
if not response.artifact_capabilities:
return "Configured remote does not support artifact service"
if self.spec.push and not response.artifact_capabilities.allow_updates:
return 'Artifact server does not allow push'
return None
# get_artifact():
#
# Get an artifact proto for a given cache key from the remote.
#
# Args:
# cache_key (str): The artifact cache key. NOTE: This "key"
# is actually the ref/name and its name in
# the protocol is inaccurate. You have been warned.
#
# Returns:
# (Artifact): The artifact proto
#
# Raises:
# grpc.RpcError: If someting goes wrong during the request.
#
def get_artifact(self, cache_key):
artifact_request = artifact_pb2.GetArtifactRequest()
artifact_request.cache_key = cache_key
return self.artifact_service.GetArtifact(artifact_request)
# update_artifact():
#
# Update an artifact with the given cache key on the remote with
# the given proto.
#
# Args:
# cache_key (str): The artifact cache key of the artifact to update.
# artifact (ArtifactProto): The artifact proto to send.
#
# Raises:
# grpc.RpcError: If someting goes wrong during the request.
#
def update_artifact(self, cache_key, artifact):
update_request = artifact_pb2.UpdateArtifactRequest()
update_request.cache_key = cache_key
update_request.artifact.CopyFrom(artifact)
self.artifact_service.UpdateArtifact(update_request)
# An ArtifactCache manages artifacts.
#
# Args:
# context (Context): The BuildStream context
#
class ArtifactCache(BaseCache):
spec_name = "artifact_cache_specs"
spec_error = ArtifactError
config_node_name = "artifacts"
index_remote_class = ArtifactRemote
def __init__(self, context):
super().__init__(context)
# create artifact directory
self.artifactdir = context.artifactdir
os.makedirs(self.artifactdir, exist_ok=True)
def update_mtime(self, ref):
try:
os.utime(os.path.join(self.artifactdir, ref))
except FileNotFoundError as e:
raise ArtifactError("Couldn't find artifact: {}".format(ref)) from e
# preflight():
#
# Preflight check.
#
def preflight(self):
self.cas.preflight()
# contains():
#
# Check whether the artifact for the specified Element is already available
# in the local artifact cache.
#
# Args:
# element (Element): The Element to check
# key (str): The cache key to use
#
# Returns: True if the artifact is in the cache, False otherwise
#
def contains(self, element, key):
ref = element.get_artifact_name(key)
return os.path.exists(os.path.join(self.artifactdir, ref))
# list_artifacts():
#
# List artifacts in this cache in LRU order.
#
# Args:
# glob (str): An option glob expression to be used to list artifacts satisfying the glob
#
# Returns:
# ([str]) - A list of artifact names as generated in LRU order
#
def list_artifacts(self, *, glob=None):
return [ref for _, ref in sorted(list(self._list_refs_mtimes(self.artifactdir, glob_expr=glob)))]
# remove():
#
# Removes the artifact for the specified ref from the local
# artifact cache.
#
# Args:
# ref (artifact_name): The name of the artifact to remove (as
# generated by `Element.get_artifact_name`)
#
def remove(self, ref):
try:
self.cas.remove(ref, basedir=self.artifactdir)
except CASCacheError as e:
raise ArtifactError("{}".format(e)) from e
# diff():
#
# Return a list of files that have been added or modified between
# the artifacts described by key_a and key_b. This expects the
# provided keys to be strong cache keys
#
# Args:
# element (Element): The element whose artifacts to compare
# key_a (str): The first artifact strong key
# key_b (str): The second artifact strong key
#
def diff(self, element, key_a, key_b):
context = self.context
artifact_a = Artifact(element, context, strong_key=key_a)
artifact_b = Artifact(element, context, strong_key=key_b)
digest_a = artifact_a._get_proto().files
digest_b = artifact_b._get_proto().files
added = []
removed = []
modified = []
self.cas.diff_trees(digest_a, digest_b, added=added, removed=removed, modified=modified)
return modified, removed, added
# push():
#
# Push committed artifact to remote repository.
#
# Args:
# element (Element): The Element whose artifact is to be pushed
# artifact (Artifact): The artifact being pushed
#
# Returns:
# (bool): True if any remote was updated, False if no pushes were required
#
# Raises:
# (ArtifactError): if there was an error
#
def push(self, element, artifact):
project = element._get_project()
display_key = element._get_brief_display_key()
index_remotes = [r for r in self._index_remotes[project] if r.push]
storage_remotes = [r for r in self._storage_remotes[project] if r.push]
pushed = False
# First push our files to all storage remotes, so that they
# can perform file checks on their end
for remote in storage_remotes:
remote.init()
element.status("Pushing data from artifact {} -> {}".format(display_key, remote))
if self._push_artifact_blobs(artifact, remote):
element.info("Pushed data from artifact {} -> {}".format(display_key, remote))
else:
element.info("Remote ({}) already has all data of artifact {} cached".format(
remote, element._get_brief_display_key()
))
for remote in index_remotes:
remote.init()
element.status("Pushing artifact {} -> {}".format(display_key, remote))
if self._push_artifact_proto(element, artifact, remote):
element.info("Pushed artifact {} -> {}".format(display_key, remote))
pushed = True
else:
element.info("Remote ({}) already has artifact {} cached".format(
remote, element._get_brief_display_key()
))
return pushed
# pull():
#
# Pull artifact from one of the configured remote repositories.
#
# Args:
# element (Element): The Element whose artifact is to be fetched
# key (str): The cache key to use
# pull_buildtrees (bool): Whether to pull buildtrees or not
#
# Returns:
# (bool): True if pull was successful, False if artifact was not available
#
def pull(self, element, key, *, pull_buildtrees=False):
artifact = None
display_key = key[:self.context.log_key_length]
project = element._get_project()
errors = []
# Start by pulling our artifact proto, so that we know which
# blobs to pull
for remote in self._index_remotes[project]:
remote.init()
try:
element.status("Pulling artifact {} <- {}".format(display_key, remote))
artifact = self._pull_artifact_proto(element, key, remote)
if artifact:
element.info("Pulled artifact {} <- {}".format(display_key, remote))
break
else:
element.info("Remote ({}) does not have artifact {} cached".format(
remote, display_key
))
except CASError as e:
element.warn("Could not pull from remote {}: {}".format(remote, e))
errors.append(e)
if errors and not artifact:
raise ArtifactError("Failed to pull artifact {}".format(display_key),
detail="\n".join(str(e) for e in errors))
# If we don't have an artifact, we can't exactly pull our
# artifact
if not artifact:
return False
errors = []
# If we do, we can pull it!
for remote in self._storage_remotes[project]:
remote.init()
try:
element.status("Pulling data for artifact {} <- {}".format(display_key, remote))
if self._pull_artifact_storage(element, artifact, remote, pull_buildtrees=pull_buildtrees):
element.info("Pulled data for artifact {} <- {}".format(display_key, remote))
return True
element.info("Remote ({}) does not have artifact {} cached".format(
remote, display_key
))
except CASError as e:
element.warn("Could not pull from remote {}: {}".format(remote, e))
errors.append(e)
if errors:
raise ArtifactError("Failed to pull artifact {}".format(display_key),
detail="\n".join(str(e) for e in errors))
return False
# pull_tree():
#
# Pull a single Tree rather than an artifact.
# Does not update local refs.
#
# Args:
# project (Project): The current project
# digest (Digest): The digest of the tree
#
def pull_tree(self, project, digest):
for remote in self._storage_remotes[project]:
digest = self.cas.pull_tree(remote, digest)
if digest:
# no need to pull from additional remotes
return digest
return None
# push_message():
#
# Push the given protobuf message to all remotes.
#
# Args:
# project (Project): The current project
# message (Message): A protobuf message to push.
#
# Raises:
# (ArtifactError): if there was an error
#
def push_message(self, project, message):
if self._has_push_remotes:
push_remotes = [r for r in self._storage_remotes[project] if r.spec.push]
else:
push_remotes = []
if not push_remotes:
raise ArtifactError("push_message was called, but no remote artifact " +
"servers are configured as push remotes.")
for remote in push_remotes:
message_digest = remote.push_message(message)
return message_digest
# link_key():
#
# Add a key for an existing artifact.
#
# Args:
# element (Element): The Element whose artifact is to be linked
# oldkey (str): An existing cache key for the artifact
# newkey (str): A new cache key for the artifact
#
def link_key(self, element, oldkey, newkey):
oldref = element.get_artifact_name(oldkey)
newref = element.get_artifact_name(newkey)
if not os.path.exists(os.path.join(self.artifactdir, newref)):
os.link(os.path.join(self.artifactdir, oldref),
os.path.join(self.artifactdir, newref))
# get_artifact_logs():
#
# Get the logs of an existing artifact
#
# Args:
# ref (str): The ref of the artifact
#
# Returns:
# logsdir (CasBasedDirectory): A CasBasedDirectory containing the artifact's logs
#
def get_artifact_logs(self, ref):
cache_id = self.cas.resolve_ref(ref, update_mtime=True)
vdir = CasBasedDirectory(self.cas, digest=cache_id).descend('logs')
return vdir
# fetch_missing_blobs():
#
# Fetch missing blobs from configured remote repositories.
#
# Args:
# project (Project): The current project
# missing_blobs (list): The Digests of the blobs to fetch
#
def fetch_missing_blobs(self, project, missing_blobs):
for remote in self._index_remotes[project]:
if not missing_blobs:
break
remote.init()
# fetch_blobs() will return the blobs that are still missing
missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
if missing_blobs:
raise ArtifactError("Blobs not found on configured artifact servers")
# find_missing_blobs():
#
# Find missing blobs from configured push remote repositories.
#
# Args:
# project (Project): The current project
# missing_blobs (list): The Digests of the blobs to check
#
# Returns:
# (list): The Digests of the blobs missing on at least one push remote
#
def find_missing_blobs(self, project, missing_blobs):
if not missing_blobs:
return []
push_remotes = [r for r in self._storage_remotes[project] if r.spec.push]
remote_missing_blobs_list = []
for remote in push_remotes:
remote.init()
remote_missing_blobs = self.cas.remote_missing_blobs(remote, missing_blobs)
for blob in remote_missing_blobs:
if blob not in remote_missing_blobs_list:
remote_missing_blobs_list.append(blob)
return remote_missing_blobs_list
# check_remotes_for_element()
#
# Check if the element is available in any of the remotes
#
# Args:
# element (Element): The element to check
#
# Returns:
# (bool): True if the element is available remotely
#
def check_remotes_for_element(self, element):
# If there are no remotes
if not self._index_remotes:
return False
project = element._get_project()
ref = element.get_artifact_name()
for remote in self._index_remotes[project]:
remote.init()
if self._query_remote(ref, remote):
return True
return False
################################################
# Local Private Methods #
################################################
# _reachable_directories()
#
# Returns:
# (iter): Iterator over directories digests available from artifacts.
#
def _reachable_directories(self):
for root, _, files in os.walk(self.artifactdir):
for artifact_file in files:
artifact = artifact_pb2.Artifact()
with open(os.path.join(root, artifact_file), 'r+b') as f:
artifact.ParseFromString(f.read())
if str(artifact.files):
yield artifact.files
if str(artifact.buildtree):
yield artifact.buildtree
# _reachable_digests()
#
# Returns:
# (iter): Iterator over single file digests in artifacts
#
def _reachable_digests(self):
for root, _, files in os.walk(self.artifactdir):
for artifact_file in files:
artifact = artifact_pb2.Artifact()
with open(os.path.join(root, artifact_file), 'r+b') as f:
artifact.ParseFromString(f.read())
if str(artifact.public_data):
yield artifact.public_data
for log_file in artifact.logs:
yield log_file.digest
# _push_artifact_blobs()
#
# Push the blobs that make up an artifact to the remote server.
#
# Args:
# artifact (Artifact): The artifact whose blobs to push.
# remote (CASRemote): The remote to push the blobs to.
#
# Returns:
# (bool) - True if we uploaded anything, False otherwise.
#
# Raises:
# ArtifactError: If we fail to push blobs (*unless* they're
# already there or we run out of space on the server).
#
def _push_artifact_blobs(self, artifact, remote):
artifact_proto = artifact._get_proto()
try:
if str(artifact_proto.files):
self.cas._send_directory(remote, artifact_proto.files)
if str(artifact_proto.buildtree):
try:
self.cas._send_directory(remote, artifact_proto.buildtree)
except FileNotFoundError:
pass
digests = []
if str(artifact_proto.public_data):
digests.append(artifact_proto.public_data)
for log_file in artifact_proto.logs:
digests.append(log_file.digest)
self.cas.send_blobs(remote, digests)
except CASRemoteError as cas_error:
if cas_error.reason != "cache-too-full":
raise ArtifactError("Failed to push artifact blobs: {}".format(cas_error))
return False
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
raise ArtifactError("Failed to push artifact blobs: {}".format(e.details()))
return False
return True
# _push_artifact_proto()
#
# Pushes the artifact proto to remote.
#
# Args:
# element (Element): The element
# artifact (Artifact): The related artifact being pushed
# remote (ArtifactRemote): Remote to push to
#
# Returns:
# (bool): Whether we pushed the artifact.
#
# Raises:
# ArtifactError: If the push fails for any reason except the
# artifact already existing.
#
def _push_artifact_proto(self, element, artifact, remote):
artifact_proto = artifact._get_proto()
keys = list(utils._deduplicate([artifact_proto.strong_key, artifact_proto.weak_key]))
# Check whether the artifact is on the server
for key in keys:
try:
remote.get_artifact(element.get_artifact_name(key=key))
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise ArtifactError("Error checking artifact cache: {}"
.format(e.details()))
else:
return False
# If not, we send the artifact proto
for key in keys:
try:
remote.update_artifact(element.get_artifact_name(key=key), artifact_proto)
except grpc.RpcError as e:
raise ArtifactError("Failed to push artifact: {}".format(e.details()))
return True
# _pull_artifact_storage():
#
# Pull artifact blobs from the given remote.
#
# Args:
# element (Element): element to pull
# key (str): The specific key for the artifact to pull
# remote (CASRemote): remote to pull from
# pull_buildtree (bool): whether to pull buildtrees or not
#
# Returns:
# (bool): True if we pulled any blobs.
#
# Raises:
# ArtifactError: If the pull failed for any reason except the
# blobs not existing on the server.
#
def _pull_artifact_storage(self, element, artifact, remote, pull_buildtrees=False):
def __pull_digest(digest):
self.cas._fetch_directory(remote, digest)
required_blobs = self.cas.required_blobs_for_directory(digest)
missing_blobs = self.cas.local_missing_blobs(required_blobs)
if missing_blobs:
self.cas.fetch_blobs(remote, missing_blobs)
try:
if str(artifact.files):
__pull_digest(artifact.files)
if pull_buildtrees and str(artifact.buildtree):
__pull_digest(artifact.buildtree)
digests = []
if str(artifact.public_data):
digests.append(artifact.public_data)
for log_digest in artifact.logs:
digests.append(log_digest.digest)
self.cas.fetch_blobs(remote, digests)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
return False
return True
# _pull_artifact_proto():
#
# Pull an artifact proto from a remote server.
#
# Args:
# element (Element): The element whose artifact to pull.
# key (str): The specific key for the artifact to pull.
# remote (ArtifactRemote): The remote to pull from.
#
# Returns:
# (Artifact|None): The artifact proto, or None if the server
# doesn't have it.
#
# Raises:
# ArtifactError: If the pull fails.
#
def _pull_artifact_proto(self, element, key, remote):
artifact_name = element.get_artifact_name(key=key)
try:
artifact = remote.get_artifact(artifact_name)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise ArtifactError("Failed to pull artifact: {}".format(e.details()))
return None
# Write the artifact proto to cache
artifact_path = os.path.join(self.artifactdir, artifact_name)
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
with utils.save_file_atomic(artifact_path, mode='wb') as f:
f.write(artifact.SerializeToString())
return artifact
# _query_remote()
#
# Args:
# ref (str): The artifact ref
# remote (ArtifactRemote): The remote we want to check
#
# Returns:
# (bool): True if the ref exists in the remote, False otherwise.
#
def _query_remote(self, ref, remote):
request = artifact_pb2.GetArtifactRequest()
request.cache_key = ref
try:
remote.artifact_service.GetArtifact(request)
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND:
raise ArtifactError("Error when querying: {}".format(e.details()))
return False
return True