# Copyright (C) 2019-2020 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Raoul Hidalgo Charman # import os import re from itertools import chain from typing import TYPE_CHECKING import grpc from . import utils from . import _yaml from ._cas import CASRemote from ._exceptions import AssetCacheError, LoadError, RemoteError from ._remote import BaseRemote, RemoteSpec, RemoteType from ._protos.build.bazel.remote.asset.v1 import remote_asset_pb2, remote_asset_pb2_grpc from ._protos.google.rpc import code_pb2 if TYPE_CHECKING: from typing import Optional, Type from ._exceptions import BstError class AssetRemote(BaseRemote): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fetch_service = None self.push_service = None def close(self): self.fetch_service = None self.push_service = None super().close() def _configure_protocols(self): # set up remote asset stubs self.fetch_service = remote_asset_pb2_grpc.FetchStub(self.channel) self.push_service = remote_asset_pb2_grpc.PushStub(self.channel) # _check(): # # Check if this remote provides everything required for the # particular kind of remote. This is expected to be called as part # of check() # # Raises: # RemoteError: If the upstream has a problem # def _check(self): request = remote_asset_pb2.FetchBlobRequest() if self.instance_name: request.instance_name = self.instance_name try: self.fetch_service.FetchBlob(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.INVALID_ARGUMENT: # Expected error as the request doesn't specify any URIs. pass elif e.code() == grpc.StatusCode.UNIMPLEMENTED: raise RemoteError( "Configured remote does not implement the Remote Asset " "Fetch service. Please check remote configuration." ) else: raise RemoteError("Remote initialisation failed with status {}: {}".format(e.code().name, e.details())) if self.spec.push: request = remote_asset_pb2.PushBlobRequest() if self.instance_name: request.instance_name = self.instance_name try: self.push_service.PushBlob(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.INVALID_ARGUMENT: # Expected error as the request doesn't specify any URIs. pass elif e.code() == grpc.StatusCode.UNIMPLEMENTED: raise RemoteError( "Configured remote does not implement the Remote Asset " "Push service. Please check remote configuration." ) else: raise RemoteError( "Remote initialisation failed with status {}: {}".format(e.code().name, e.details()) ) # fetch_blob(): # # Resolve URIs to a CAS blob digest. # # Args: # uris (list of str): The URIs to resolve. Multiple URIs should represent # the same content available at different locations. # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the # content to fetch. # # Returns # (FetchBlobResponse): The asset server response or None if the resource # is not available. # # Raises: # AssetCacheError: If the upstream has a problem # def fetch_blob(self, uris, *, qualifiers=None): request = remote_asset_pb2.FetchBlobRequest() if self.instance_name: request.instance_name = self.instance_name request.uris.extend(uris) if qualifiers: request.qualifiers.extend(qualifiers) try: response = self.fetch_service.FetchBlob(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.NOT_FOUND: return None raise AssetCacheError("FetchBlob failed with status {}: {}".format(e.code().name, e.details())) from e if response.status.code == code_pb2.NOT_FOUND: return None if response.status.code != code_pb2.OK: raise AssetCacheError("FetchBlob failed with response status {}".format(response.status.code)) return response # fetch_directory(): # # Resolve URIs to a CAS Directory digest. # # Args: # uris (list of str): The URIs to resolve. Multiple URIs should represent # the same content available at different locations. # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the # content to fetch. # # Returns # (FetchDirectoryResponse): The asset server response or None if the resource # is not available. # # Raises: # AssetCacheError: If the upstream has a problem # def fetch_directory(self, uris, *, qualifiers=None): request = remote_asset_pb2.FetchDirectoryRequest() if self.instance_name: request.instance_name = self.instance_name request.uris.extend(uris) if qualifiers: request.qualifiers.extend(qualifiers) try: response = self.fetch_service.FetchDirectory(request) except grpc.RpcError as e: if e.code() == grpc.StatusCode.NOT_FOUND: return None raise AssetCacheError("FetchDirectory failed with status {}: {}".format(e.code().name, e.details())) from e if response.status.code == code_pb2.NOT_FOUND: return None if response.status.code != code_pb2.OK: raise AssetCacheError("FetchDirectory failed with response status {}".format(response.status.code)) return response # push_blob(): # # Associate a CAS blob digest to URIs. # # Args: # uris (list of str): The URIs to associate with the blob digest. # blob_digest (Digest): The CAS blob to associate. # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the # content that is being pushed. # references_blobs (list of Digest): Referenced blobs that need to not expire # before expiration of this association. # references_directories (list of Digest): Referenced directories that need to not expire # before expiration of this association. # # Raises: # AssetCacheError: If the upstream has a problem # def push_blob(self, uris, blob_digest, *, qualifiers=None, references_blobs=None, references_directories=None): request = remote_asset_pb2.PushBlobRequest() if self.instance_name: request.instance_name = self.instance_name request.uris.extend(uris) request.blob_digest.CopyFrom(blob_digest) if qualifiers: request.qualifiers.extend(qualifiers) if references_blobs: request.references_blobs.extend(references_blobs) if references_directories: request.references_directories.extend(references_directories) try: self.push_service.PushBlob(request) except grpc.RpcError as e: raise AssetCacheError("PushBlob failed with status {}: {}".format(e.code().name, e.details())) from e # push_directory(): # # Associate a CAS Directory digest to URIs. # # Args: # uris (list of str): The URIs to associate with the blob digest. # directory_digest (Digest): The CAS Direcdtory to associate. # qualifiers (list of Qualifier): Optional qualifiers sub-specifying the # content that is being pushed. # references_blobs (list of Digest): Referenced blobs that need to not expire # before expiration of this association. # references_directories (list of Digest): Referenced directories that need to not expire # before expiration of this association. # # Raises: # AssetCacheError: If the upstream has a problem # def push_directory( self, uris, directory_digest, *, qualifiers=None, references_blobs=None, references_directories=None ): request = remote_asset_pb2.PushDirectoryRequest() if self.instance_name: request.instance_name = self.instance_name request.uris.extend(uris) request.root_directory_digest.CopyFrom(directory_digest) if qualifiers: request.qualifiers.extend(qualifiers) if references_blobs: request.references_blobs.extend(references_blobs) if references_directories: request.references_directories.extend(references_directories) try: self.push_service.PushDirectory(request) except grpc.RpcError as e: raise AssetCacheError("PushDirectory failed with status {}: {}".format(e.code().name, e.details())) from e # Base Asset Cache for Caches to derive from # class AssetCache: # None of these should ever be called in the base class, but this appeases # pylint to some degree spec_name = None # type: str config_node_name = None # type: str def __init__(self, context): self.context = context self.cas = context.get_cascache() self._remotes_setup = False # Check to prevent double-setup of remotes # Per-project list of Remote instances. self._storage_remotes = {} self._index_remotes = {} self.global_remote_specs = [] self.project_remote_specs = {} self._has_fetch_remotes = False self._has_push_remotes = False self._basedir = None # close_grpc_channels(): # # Close open gRPC channels. # def close_grpc_channels(self): # Close all remotes and their gRPC channels for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()): for remote in project_remotes: remote.close() # release_resources(): # # Release resources used by AssetCache. # def release_resources(self): self.close_grpc_channels() # specs_from_config_node() # # Parses the configuration of remote artifact caches from a config block. # # Args: # config_node (dict): The config block, which may contain a key defined by cls.config_node_name # basedir (str): The base directory for relative paths # # Returns: # A list of RemoteSpec instances. # # Raises: # LoadError, if the config block contains invalid keys. # @classmethod def specs_from_config_node(cls, config_node, basedir=None): cache_specs = [] try: artifacts = [config_node.get_mapping(cls.config_node_name)] except LoadError: try: artifacts = config_node.get_sequence(cls.config_node_name, default=[]) except LoadError: provenance = config_node.get_node(cls.config_node_name).get_provenance() raise _yaml.LoadError( "{}: '{}' must be a single remote mapping, or a list of mappings".format( provenance, cls.config_node_name ), _yaml.LoadErrorReason.INVALID_DATA, ) for spec_node in artifacts: cache_specs.append(RemoteSpec.new_from_config_node(spec_node)) return cache_specs # _configured_remote_cache_specs(): # # Return the list of configured remotes for a given project, in priority # order. This takes into account the user and project configuration. # # Args: # context (Context): The BuildStream context # project (Project): The BuildStream project # # Returns: # A list of RemoteSpec instances describing the remote caches. # @classmethod def _configured_remote_cache_specs(cls, context, project): project_overrides = context.get_overrides(project.name) project_extra_specs = cls.specs_from_config_node(project_overrides) project_specs = getattr(project, cls.spec_name) context_specs = getattr(context, cls.spec_name) return list(utils._deduplicate(project_extra_specs + project_specs + context_specs)) # setup_remotes(): # # Sets up which remotes to use # # Args: # use_config (bool): Whether to use project configuration # remote_url (str): Remote cache URL # # This requires that all of the projects which are to be processed in the session # have already been loaded and are observable in the Context. # def setup_remotes(self, *, use_config=False, remote_url=None): # Ensure we do not double-initialise since this can be expensive if self._remotes_setup: return self._remotes_setup = True # Initialize remote caches. We allow the commandline to override # the user config in some cases (for example `bst artifact push --remote=...`). has_remote_caches = False if remote_url: self._set_remotes([RemoteSpec(remote_url, push=True)]) has_remote_caches = True if use_config: for project in self.context.get_projects(): caches = self._configured_remote_cache_specs(self.context, project) if caches: # caches is a list of RemoteSpec instances self._set_remotes(caches, project=project) has_remote_caches = True if has_remote_caches: self._initialize_remotes() # Notify remotes that forking is disabled def notify_fork_disabled(self): for project in self._index_remotes: for remote in self._index_remotes[project]: remote.notify_fork_disabled() for project in self._storage_remotes: for remote in self._storage_remotes[project]: remote.notify_fork_disabled() # initialize_remotes(): # # This will contact each remote cache. # # Args: # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): index_remotes, storage_remotes = self._create_remote_instances(on_failure=on_failure) # Assign remote instances to their respective projects for project in self.context.get_projects(): # Get the list of specs that should be considered for this # project remote_specs = self.global_remote_specs.copy() if project in self.project_remote_specs: remote_specs.extend(self.project_remote_specs[project]) # De-duplicate the list remote_specs = list(utils._deduplicate(remote_specs)) def get_remotes(remote_list, remote_specs): for remote_spec in remote_specs: # If a remote_spec didn't make it into the remotes # dict, that means we can't access it, and it has been # disabled for this session. if remote_spec not in remote_list: continue yield remote_list[remote_spec] self._index_remotes[project] = list(get_remotes(index_remotes, remote_specs)) self._storage_remotes[project] = list(get_remotes(storage_remotes, remote_specs)) # has_fetch_remotes(): # # Check whether any remote repositories are available for fetching. # # Args: # plugin (Plugin): The Plugin to check # # Returns: True if any remote repositories are configured, False otherwise # def has_fetch_remotes(self, *, plugin=None): if not self._has_fetch_remotes: # No project has fetch remotes return False elif plugin is None: # At least one (sub)project has fetch remotes return True else: # Check whether the specified element's project has fetch remotes index_remotes = self._index_remotes[plugin._get_project()] storage_remotes = self._storage_remotes[plugin._get_project()] return index_remotes and storage_remotes # has_push_remotes(): # # Check whether any remote repositories are available for pushing. # # Args: # element (Element): The Element to check # # Returns: True if any remote repository is configured, False otherwise # def has_push_remotes(self, *, plugin=None): if not self._has_push_remotes: # No project has push remotes return False elif plugin is None: # At least one (sub)project has push remotes return True else: # Check whether the specified element's project has push remotes index_remotes = self._index_remotes[plugin._get_project()] storage_remotes = self._storage_remotes[plugin._get_project()] return any(remote.spec.push for remote in index_remotes) and any( remote.spec.push for remote in storage_remotes ) ################################################ # Local Private Methods # ################################################ # _create_remote_instances(): # # Create the global set of Remote instances, including # project-specific and global instances, ensuring that all of them # are accessible. # # Args: # on_failure (Callable[[Remote,Exception],None]): # What do do when a remote doesn't respond. # # Returns: # (Dict[RemoteSpec, AssetRemote], Dict[RemoteSpec, CASRemote]) - # The created remote instances, index first, storage last. # def _create_remote_instances(self, *, on_failure=None): # Create a flat list of all remote specs, global or # project-specific remote_specs = self.global_remote_specs.copy() for project in self.project_remote_specs: remote_specs.extend(self.project_remote_specs[project]) # By de-duplicating it after we flattened the list, we ensure # that we never instantiate the same remote twice. This # de-duplication also preserves their order. remote_specs = list(utils._deduplicate(remote_specs)) # Now let's create a dict of this, indexed by their specs, so # that we can later assign them to the right projects. index_remotes = {} storage_remotes = {} for remote_spec in remote_specs: try: index, storage = self._instantiate_remote(remote_spec) except RemoteError as err: if on_failure: on_failure(remote_spec, str(err)) continue raise # Finally, we can instantiate the remote. Note that # NamedTuples are hashable, so we can use them as pretty # low-overhead keys. if index: index_remotes[remote_spec] = index if storage: storage_remotes[remote_spec] = storage self._has_fetch_remotes = storage_remotes and index_remotes self._has_push_remotes = any(spec.push for spec in storage_remotes) and any( spec.push for spec in index_remotes ) return index_remotes, storage_remotes # _instantiate_remote() # # Instantiate a remote given its spec, asserting that it is # reachable - this may produce two remote instances (a storage and # an index remote as specified by the class variables). # # Args: # # remote_spec (RemoteSpec): The spec of the remote to # instantiate. # # Returns: # # (Tuple[Remote|None, Remote|None]) - The remotes, index remote # first, storage remote second. One must always be specified, # the other may be None. # def _instantiate_remote(self, remote_spec): # Our remotes can be index, storage or both. In either case, # we need to use a different type of Remote for our calls, so # we create two objects here index = None storage = None if remote_spec.type in [RemoteType.INDEX, RemoteType.ALL]: index = AssetRemote(remote_spec) # pylint: disable=not-callable index.check() if remote_spec.type in [RemoteType.STORAGE, RemoteType.ALL]: storage = CASRemote(remote_spec, self.cas) storage.check() return (index, storage) # _set_remotes(): # # Set the list of remote caches. If project is None, the global list of # remote caches will be set, which is used by all projects. If a project is # specified, the per-project list of remote caches will be set. # # Args: # remote_specs (list): List of ArtifactCacheSpec instances, in priority order. # project (Project): The Project instance for project-specific remotes def _set_remotes(self, remote_specs, *, project=None): if project is None: # global remotes self.global_remote_specs = remote_specs else: self.project_remote_specs[project] = remote_specs # _initialize_remotes() # # An internal wrapper which calls the abstract method and # reports takes care of messaging # def _initialize_remotes(self): def remote_failed(remote, error): self.context.messenger.warn("Failed to initialize remote {}: {}".format(remote.url, error)) with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True): self.initialize_remotes(on_failure=remote_failed) # _list_refs_mtimes() # # List refs in a directory, given a base path. Also returns the # associated mtimes # # Args: # base_path (str): Base path to traverse over # glob_expr (str|None): Optional glob expression to match against files # # Returns: # (iter (mtime, filename)]): iterator of tuples of mtime and refs # def _list_refs_mtimes(self, base_path, *, glob_expr=None): path = base_path if glob_expr is not None: globdir = os.path.dirname(glob_expr) if not any(c in "*?[" for c in globdir): # path prefix contains no globbing characters so # append the glob to optimise the os.walk() path = os.path.join(base_path, globdir) regexer = None if glob_expr: expression = utils._glob2re(glob_expr) regexer = re.compile(expression) for root, _, files in os.walk(path): for filename in files: ref_path = os.path.join(root, filename) relative_path = os.path.relpath(ref_path, base_path) # Relative to refs head if regexer is None or regexer.match(relative_path): # Obtain the mtime (the time a file was last modified) yield (os.path.getmtime(ref_path), relative_path) # _remove_ref() # # Removes a ref. # # This also takes care of pruning away directories which can # be removed after having removed the given ref. # # Args: # ref (str): The ref to remove # # Raises: # (AssetCacheError): If the ref didnt exist, or a system error # occurred while removing it # def _remove_ref(self, ref): try: utils._remove_path_with_parents(self._basedir, ref) except FileNotFoundError as e: raise AssetCacheError("Could not find ref '{}'".format(ref)) from e except OSError as e: raise AssetCacheError("System error while removing ref '{}': {}".format(ref, e)) from e