summaryrefslogtreecommitdiff
path: root/src/buildstream/_assetcache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_assetcache.py')
-rw-r--r--src/buildstream/_assetcache.py661
1 files changed, 661 insertions, 0 deletions
diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py
new file mode 100644
index 000000000..68f7fd732
--- /dev/null
+++ b/src/buildstream/_assetcache.py
@@ -0,0 +1,661 @@
+# Copyright (C) 2019-2020 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
+#
+import os
+from fnmatch import fnmatch
+from itertools import chain
+from typing import TYPE_CHECKING
+import grpc
+
+from . import utils
+from . import _yaml
+from ._cas import CASRemote
+from ._message import Message, MessageType
+from ._exceptions import AssetCacheError, LoadError, RemoteError
+from ._remote import BaseRemote, RemoteSpec, RemoteType
+from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc
+from ._protos.google.rpc import code_pb2
+
+
+if TYPE_CHECKING:
+ from typing import Optional, Type
+ from ._exceptions import BstError
+
+
+class AssetRemote(BaseRemote):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.fetch_service = None
+ self.push_service = None
+
+ def close(self):
+ self.fetch_service = None
+ self.push_service = None
+ super().close()
+
+ def _configure_protocols(self):
+ # set up remote asset stubs
+ self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel)
+ self.push_service = remote_asset_pb2_grpc.PushStub(self.channel)
+
+ # _check():
+ #
+ # Check if this remote provides everything required for the
+ # particular kind of remote. This is expected to be called as part
+ # of check()
+ #
+ # Raises:
+ # RemoteError: If the upstream has a problem
+ #
+ def _check(self):
+ request = remote_asset_pb2.FetchBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ try:
+ self.fetch_service.FetchBlob(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
+ # Expected error as the request doesn't specify any URIs.
+ pass
+ elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise RemoteError(
+ "Configured remote does not implement the Remote Asset "
+ "Fetch service. Please check remote configuration."
+ )
+ else:
+ raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details()))
+
+ if self.spec.push:
+ request = remote_asset_pb2.PushBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ try:
+ self.push_service.PushBlob(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
+ # Expected error as the request doesn't specify any URIs.
+ pass
+ elif e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise RemoteError(
+ "Configured remote does not implement the Remote Asset "
+ "Push service. Please check remote configuration."
+ )
+ else:
+ raise RemoteError(
+ "Remote initialisation failed with status {}: {}".format(e.code().name, e.details())
+ )
+
+ # fetch_blob():
+ #
+ # Resolve URIs to a CAS blob digest.
+ #
+ # Args:
+ # uris (list of str): The URIs to resolve. Multiple URIs should represent
+ # the same content available at different locations.
+ # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
+ # content to fetch.
+ #
+ # Returns
+ # (FetchBlobResponse): The asset server response or None if the resource
+ # is not available.
+ #
+ # Raises:
+ # AssetCacheError: If the upstream has a problem
+ #
+ def fetch_blob(self, uris, *, qualifiers=None):
+ request = remote_asset_pb2.FetchBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ request.uris.extend(uris)
+ if qualifiers:
+ request.qualifiers.extend(qualifiers)
+
+ try:
+ response = self.fetch_service.FetchBlob(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ return None
+
+ raise AssetCacheError("FetchBlob failed with status {}: {}".format(e.code().name, e.details())) from e
+
+ if response.status.code == code_pb2.NOT_FOUND:
+ return None
+
+ if response.status.code != code_pb2.OK:
+ raise AssetCacheError("FetchBlob failed with response status {}".format(response.status.code))
+
+ return response
+
+ # fetch_directory():
+ #
+ # Resolve URIs to a CAS Directory digest.
+ #
+ # Args:
+ # uris (list of str): The URIs to resolve. Multiple URIs should represent
+ # the same content available at different locations.
+ # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
+ # content to fetch.
+ #
+ # Returns
+ # (FetchDirectoryResponse): The asset server response or None if the resource
+ # is not available.
+ #
+ # Raises:
+ # AssetCacheError: If the upstream has a problem
+ #
+ def fetch_directory(self, uris, *, qualifiers=None):
+ request = remote_asset_pb2.FetchDirectoryRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ request.uris.extend(uris)
+ if qualifiers:
+ request.qualifiers.extend(qualifiers)
+
+ try:
+ response = self.fetch_service.FetchDirectory(request)
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.NOT_FOUND:
+ return None
+
+ raise AssetCacheError("FetchDirectory failed with status {}: {}".format(e.code().name, e.details())) from e
+
+ if response.status.code == code_pb2.NOT_FOUND:
+ return None
+
+ if response.status.code != code_pb2.OK:
+ raise AssetCacheError("FetchDirectory failed with response status {}".format(response.status.code))
+
+ return response
+
+ # push_blob():
+ #
+ # Associate a CAS blob digest to URIs.
+ #
+ # Args:
+ # uris (list of str): The URIs to associate with the blob digest.
+ # blob_digest (Digest): The CAS blob to associate.
+ # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
+ # content that is being pushed.
+ # references_blobs (list of Digest): Referenced blobs that need to not expire
+ # before expiration of this association.
+ # references_directories (list of Digest): Referenced directories that need to not expire
+ # before expiration of this association.
+ #
+ # Raises:
+ # AssetCacheError: If the upstream has a problem
+ #
+ def push_blob(self, uris, blob_digest, *, qualifiers=None, references_blobs=None, references_directories=None):
+ request = remote_asset_pb2.PushBlobRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ request.uris.extend(uris)
+ request.blob_digest.CopyFrom(blob_digest)
+ if qualifiers:
+ request.qualifiers.extend(qualifiers)
+ if references_blobs:
+ request.references_blobs.extend(references_blobs)
+ if references_directories:
+ request.references_directories.extend(references_directories)
+
+ try:
+ self.push_service.PushBlob(request)
+ except grpc.RpcError as e:
+ raise AssetCacheError("PushBlob failed with status {}: {}".format(e.code().name, e.details())) from e
+
+ # push_directory():
+ #
+ # Associate a CAS Directory digest to URIs.
+ #
+ # Args:
+ # uris (list of str): The URIs to associate with the blob digest.
+ # directory_digest (Digest): The CAS Direcdtory to associate.
+ # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the
+ # content that is being pushed.
+ # references_blobs (list of Digest): Referenced blobs that need to not expire
+ # before expiration of this association.
+ # references_directories (list of Digest): Referenced directories that need to not expire
+ # before expiration of this association.
+ #
+ # Raises:
+ # AssetCacheError: If the upstream has a problem
+ #
+ def push_directory(
+ self, uris, directory_digest, *, qualifiers=None, references_blobs=None, references_directories=None
+ ):
+ request = remote_asset_pb2.PushDirectoryRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+ request.uris.extend(uris)
+ request.root_directory_digest.CopyFrom(directory_digest)
+ if qualifiers:
+ request.qualifiers.extend(qualifiers)
+ if references_blobs:
+ request.references_blobs.extend(references_blobs)
+ if references_directories:
+ request.references_directories.extend(references_directories)
+
+ try:
+ self.push_service.PushDirectory(request)
+ except grpc.RpcError as e:
+ raise AssetCacheError("PushDirectory failed with status {}: {}".format(e.code().name, e.details())) from e
+
+
+# Base Asset Cache for Caches to derive from
+#
+class AssetCache:
+
+ # None of these should ever be called in the base class, but this appeases
+ # pylint to some degree
+ spec_name = None # type: str
+ config_node_name = None # type: str
+
+ def __init__(self, context):
+ self.context = context
+ self.cas = context.get_cascache()
+
+ self._remotes_setup = False # Check to prevent double-setup of remotes
+ # Per-project list of Remote instances.
+ self._storage_remotes = {}
+ self._index_remotes = {}
+
+ self.global_remote_specs = []
+ self.project_remote_specs = {}
+
+ self._has_fetch_remotes = False
+ self._has_push_remotes = False
+
+ self._basedir = None
+
+ # close_grpc_channels():
+ #
+ # Close open gRPC channels.
+ #
+ def close_grpc_channels(self):
+ # Close all remotes and their gRPC channels
+ for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()):
+ for remote in project_remotes:
+ remote.close()
+
+ # release_resources():
+ #
+ # Release resources used by AssetCache.
+ #
+ def release_resources(self):
+ self.close_grpc_channels()
+
+ # specs_from_config_node()
+ #
+ # Parses the configuration of remote artifact caches from a config block.
+ #
+ # Args:
+ # config_node (dict): The config block, which may contain a key defined by cls.config_node_name
+ # basedir (str): The base directory for relative paths
+ #
+ # Returns:
+ # A list of RemoteSpec instances.
+ #
+ # Raises:
+ # LoadError, if the config block contains invalid keys.
+ #
+ @classmethod
+ def specs_from_config_node(cls, config_node, basedir=None):
+ cache_specs = []
+
+ try:
+ artifacts = [config_node.get_mapping(cls.config_node_name)]
+ except LoadError:
+ try:
+ artifacts = config_node.get_sequence(cls.config_node_name, default=[])
+ except LoadError:
+ provenance = config_node.get_node(cls.config_node_name).get_provenance()
+ raise _yaml.LoadError(
+ "{}: '{}' must be a single remote mapping, or a list of mappings".format(
+ provenance, cls.config_node_name
+ ),
+ _yaml.LoadErrorReason.INVALID_DATA,
+ )
+
+ for spec_node in artifacts:
+ cache_specs.append(RemoteSpec.new_from_config_node(spec_node))
+
+ return cache_specs
+
+ # _configured_remote_cache_specs():
+ #
+ # Return the list of configured remotes for a given project, in priority
+ # order. This takes into account the user and project configuration.
+ #
+ # Args:
+ # context (Context): The BuildStream context
+ # project (Project): The BuildStream project
+ #
+ # Returns:
+ # A list of RemoteSpec instances describing the remote caches.
+ #
+ @classmethod
+ def _configured_remote_cache_specs(cls, context, project):
+ project_overrides = context.get_overrides(project.name)
+ project_extra_specs = cls.specs_from_config_node(project_overrides)
+
+ project_specs = getattr(project, cls.spec_name)
+ context_specs = getattr(context, cls.spec_name)
+
+ return list(utils._deduplicate(project_extra_specs + project_specs + context_specs))
+
+ # setup_remotes():
+ #
+ # Sets up which remotes to use
+ #
+ # Args:
+ # use_config (bool): Whether to use project configuration
+ # remote_url (str): Remote cache URL
+ #
+ # This requires that all of the projects which are to be processed in the session
+ # have already been loaded and are observable in the Context.
+ #
+ def setup_remotes(self, *, use_config=False, remote_url=None):
+
+ # Ensure we do not double-initialise since this can be expensive
+ if self._remotes_setup:
+ return
+
+ self._remotes_setup = True
+
+ # Initialize remote caches. We allow the commandline to override
+ # the user config in some cases (for example `bst artifact push --remote=...`).
+ has_remote_caches = False
+ if remote_url:
+ self._set_remotes([RemoteSpec(remote_url, push=True)])
+ has_remote_caches = True
+ if use_config:
+ for project in self.context.get_projects():
+ caches = self._configured_remote_cache_specs(self.context, project)
+ if caches: # caches is a list of RemoteSpec instances
+ self._set_remotes(caches, project=project)
+ has_remote_caches = True
+ if has_remote_caches:
+ self._initialize_remotes()
+
+ # Notify remotes that forking is disabled
+ def notify_fork_disabled(self):
+ for project in self._index_remotes:
+ for remote in self._index_remotes[project]:
+ remote.notify_fork_disabled()
+ for project in self._storage_remotes:
+ for remote in self._storage_remotes[project]:
+ remote.notify_fork_disabled()
+
+ # initialize_remotes():
+ #
+ # This will contact each remote cache.
+ #
+ # Args:
+ # on_failure (callable): Called if we fail to contact one of the caches.
+ #
+ def initialize_remotes(self, *, on_failure=None):
+ index_remotes, storage_remotes = self._create_remote_instances(on_failure=on_failure)
+
+ # Assign remote instances to their respective projects
+ for project in self.context.get_projects():
+ # Get the list of specs that should be considered for this
+ # project
+ remote_specs = self.global_remote_specs.copy()
+ if project in self.project_remote_specs:
+ remote_specs.extend(self.project_remote_specs[project])
+
+ # De-duplicate the list
+ remote_specs = list(utils._deduplicate(remote_specs))
+
+ def get_remotes(remote_list, remote_specs):
+ for remote_spec in remote_specs:
+ # If a remote_spec didn't make it into the remotes
+ # dict, that means we can't access it, and it has been
+ # disabled for this session.
+ if remote_spec not in remote_list:
+ continue
+
+ yield remote_list[remote_spec]
+
+ self._index_remotes[project] = list(get_remotes(index_remotes, remote_specs))
+ self._storage_remotes[project] = list(get_remotes(storage_remotes, remote_specs))
+
+ # has_fetch_remotes():
+ #
+ # Check whether any remote repositories are available for fetching.
+ #
+ # Args:
+ # plugin (Plugin): The Plugin to check
+ #
+ # Returns: True if any remote repositories are configured, False otherwise
+ #
+ def has_fetch_remotes(self, *, plugin=None):
+ if not self._has_fetch_remotes:
+ # No project has fetch remotes
+ return False
+ elif plugin is None:
+ # At least one (sub)project has fetch remotes
+ return True
+ else:
+ # Check whether the specified element's project has fetch remotes
+ index_remotes = self._index_remotes[plugin._get_project()]
+ storage_remotes = self._storage_remotes[plugin._get_project()]
+ return index_remotes and storage_remotes
+
+ # has_push_remotes():
+ #
+ # Check whether any remote repositories are available for pushing.
+ #
+ # Args:
+ # element (Element): The Element to check
+ #
+ # Returns: True if any remote repository is configured, False otherwise
+ #
+ def has_push_remotes(self, *, plugin=None):
+ if not self._has_push_remotes:
+ # No project has push remotes
+ return False
+ elif plugin is None:
+ # At least one (sub)project has push remotes
+ return True
+ else:
+ # Check whether the specified element's project has push remotes
+ index_remotes = self._index_remotes[plugin._get_project()]
+ storage_remotes = self._storage_remotes[plugin._get_project()]
+ return any(remote.spec.push for remote in index_remotes) and any(
+ remote.spec.push for remote in storage_remotes
+ )
+
+ ################################################
+ # Local Private Methods #
+ ################################################
+
+ # _create_remote_instances():
+ #
+ # Create the global set of Remote instances, including
+ # project-specific and global instances, ensuring that all of them
+ # are accessible.
+ #
+ # Args:
+ # on_failure (Callable[[Remote,Exception],None]):
+ # What do do when a remote doesn't respond.
+ #
+ # Returns:
+ # (Dict[RemoteSpec, AssetRemote], Dict[RemoteSpec, CASRemote]) -
+ # The created remote instances, index first, storage last.
+ #
+ def _create_remote_instances(self, *, on_failure=None):
+ # Create a flat list of all remote specs, global or
+ # project-specific
+ remote_specs = self.global_remote_specs.copy()
+ for project in self.project_remote_specs:
+ remote_specs.extend(self.project_remote_specs[project])
+
+ # By de-duplicating it after we flattened the list, we ensure
+ # that we never instantiate the same remote twice. This
+ # de-duplication also preserves their order.
+ remote_specs = list(utils._deduplicate(remote_specs))
+
+ # Now let's create a dict of this, indexed by their specs, so
+ # that we can later assign them to the right projects.
+ index_remotes = {}
+ storage_remotes = {}
+ for remote_spec in remote_specs:
+ try:
+ index, storage = self._instantiate_remote(remote_spec)
+ except RemoteError as err:
+ if on_failure:
+ on_failure(remote_spec, str(err))
+ continue
+
+ raise
+
+ # Finally, we can instantiate the remote. Note that
+ # NamedTuples are hashable, so we can use them as pretty
+ # low-overhead keys.
+ if index:
+ index_remotes[remote_spec] = index
+ if storage:
+ storage_remotes[remote_spec] = storage
+
+ self._has_fetch_remotes = storage_remotes and index_remotes
+ self._has_push_remotes = any(spec.push for spec in storage_remotes) and any(
+ spec.push for spec in index_remotes
+ )
+
+ return index_remotes, storage_remotes
+
+ # _instantiate_remote()
+ #
+ # Instantiate a remote given its spec, asserting that it is
+ # reachable - this may produce two remote instances (a storage and
+ # an index remote as specified by the class variables).
+ #
+ # Args:
+ #
+ # remote_spec (RemoteSpec): The spec of the remote to
+ # instantiate.
+ #
+ # Returns:
+ #
+ # (Tuple[Remote|None, Remote|None]) - The remotes, index remote
+ # first, storage remote second. One must always be specified,
+ # the other may be None.
+ #
+ def _instantiate_remote(self, remote_spec):
+ # Our remotes can be index, storage or both. In either case,
+ # we need to use a different type of Remote for our calls, so
+ # we create two objects here
+ index = None
+ storage = None
+ if remote_spec.type in [RemoteType.INDEX, RemoteType.ALL]:
+ index = AssetRemote(remote_spec) # pylint: disable=not-callable
+ index.check()
+ if remote_spec.type in [RemoteType.STORAGE, RemoteType.ALL]:
+ storage = CASRemote(remote_spec, self.cas)
+ storage.check()
+
+ return (index, storage)
+
+ # _message()
+ #
+ # Local message propagator
+ #
+ def _message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ self.context.messenger.message(Message(message_type, message, **args))
+
+ # _set_remotes():
+ #
+ # Set the list of remote caches. If project is None, the global list of
+ # remote caches will be set, which is used by all projects. If a project is
+ # specified, the per-project list of remote caches will be set.
+ #
+ # Args:
+ # remote_specs (list): List of ArtifactCacheSpec instances, in priority order.
+ # project (Project): The Project instance for project-specific remotes
+ def _set_remotes(self, remote_specs, *, project=None):
+ if project is None:
+ # global remotes
+ self.global_remote_specs = remote_specs
+ else:
+ self.project_remote_specs[project] = remote_specs
+
+ # _initialize_remotes()
+ #
+ # An internal wrapper which calls the abstract method and
+ # reports takes care of messaging
+ #
+ def _initialize_remotes(self):
+ def remote_failed(remote, error):
+ self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(remote.url, error))
+
+ with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True):
+ self.initialize_remotes(on_failure=remote_failed)
+
+ # _list_refs_mtimes()
+ #
+ # List refs in a directory, given a base path. Also returns the
+ # associated mtimes
+ #
+ # Args:
+ # base_path (str): Base path to traverse over
+ # glob_expr (str|None): Optional glob expression to match against files
+ #
+ # Returns:
+ # (iter (mtime, filename)]): iterator of tuples of mtime and refs
+ #
+ def _list_refs_mtimes(self, base_path, *, glob_expr=None):
+ path = base_path
+ if glob_expr is not None:
+ globdir = os.path.dirname(glob_expr)
+ if not any(c in "*?[" for c in globdir):
+ # path prefix contains no globbing characters so
+ # append the glob to optimise the os.walk()
+ path = os.path.join(base_path, globdir)
+
+ for root, _, files in os.walk(path):
+ for filename in files:
+ ref_path = os.path.join(root, filename)
+ relative_path = os.path.relpath(ref_path, base_path) # Relative to refs head
+ if not glob_expr or fnmatch(relative_path, glob_expr):
+ # Obtain the mtime (the time a file was last modified)
+ yield (os.path.getmtime(ref_path), relative_path)
+
+ # _remove_ref()
+ #
+ # Removes a ref.
+ #
+ # This also takes care of pruning away directories which can
+ # be removed after having removed the given ref.
+ #
+ # Args:
+ # ref (str): The ref to remove
+ #
+ # Raises:
+ # (AssetCacheError): If the ref didnt exist, or a system error
+ # occurred while removing it
+ #
+ def _remove_ref(self, ref):
+ try:
+ utils._remove_path_with_parents(self._basedir, ref)
+ except FileNotFoundError as e:
+ raise AssetCacheError("Could not find ref '{}'".format(ref)) from e
+ except OSError as e:
+ raise AssetCacheError("System error while removing ref '{}': {}".format(ref, e)) from e