# # Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Raoul Hidalgo Charman # import os import grpc from ._remote import BaseRemote from ._cas.casremote import BlobNotFound from .storage._casbaseddirectory import CasBasedDirectory from ._basecache import BaseCache from ._exceptions import CASError, CASRemoteError, SourceCacheError from . import utils from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ source_pb2, source_pb2_grpc class SourceRemote(BaseRemote): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.source_service = None def close(self): self.source_service = None super().close() def _configure_protocols(self): # set up source service self.source_service = source_pb2_grpc.SourceServiceStub(self.channel) # _check(): # # Check if this remote provides everything required for the # particular kind of remote. This is expected to be called as part # of check(), and must be called in a non-main process. # # Returns: # (str|None): An error message, or None if no error message. # def _check(self): capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel) # check that the service supports sources try: request = buildstream_pb2.GetCapabilitiesRequest() if self.instance_name: request.instance_name = self.instance_name response = capabilities_service.GetCapabilities(request) except grpc.RpcError as e: # Check if this remote has the artifact service if e.code() == grpc.StatusCode.UNIMPLEMENTED: return ("Configured remote does not have the BuildStream " "capabilities service. Please check remote configuration.") # Else raise exception with details return "Remote initialisation failed: {}".format(e.details()) if not response.source_capabilities: return "Configured remote does not support source service" if self.spec.push and not response.source_capabilities.allow_updates: return 'Source server does not allow push' return None # get_source(): # # Get a source proto for a given source_ref from the remote. # # Args: # source_ref (str): The source ref of the source to pull. # # Returns: # (Source): The source proto # # Raises: # grpc.RpcError: If something goes wrong during the request. # def get_source(self, source_ref): request = source_pb2.GetSourceRequest() request.cache_key = source_ref return self.source_service.GetSource(request) # update_source(): # # Update the source on the remote. # # Args: # source_ref (str): The source ref of the source to update. # source (Source): The proto to update with. # # Returns: # (bool): Whether the update was successful. # # Raises: # grpc.RpcError: If something goes wrong during the request. # def update_source(self, source_ref, source): request = source_pb2.UpdateSourceRequest() request.cache_key = source_ref request.source.CopyFrom(source) return self.source_service.UpdateSource(request) # Class that keeps config of remotes and deals with caching of sources. # # Args: # context (Context): The Buildstream context # class SourceCache(BaseCache): spec_name = "source_cache_specs" spec_error = SourceCacheError config_node_name = "source-caches" index_remote_class = SourceRemote def __init__(self, context): super().__init__(context) self.sourcerefdir = os.path.join(context.cachedir, 'source_protos') os.makedirs(self.sourcerefdir, exist_ok=True) # list_sources() # # Get list of all sources in the `sources_protos/` folder # # Returns: # ([str]): iterable over all source refs # def list_sources(self): return [ref for _, ref in self._list_refs_mtimes(self.sourcerefdir)] # contains() # # Given a source, gets the ref name and checks whether the local CAS # contains it. # # Args: # source (Source): Source to check # # Returns: # (bool): whether the CAS contains this source or not # def contains(self, source): ref = source._get_source_name() path = self._source_path(ref) if not os.path.exists(path): return False # check files source_proto = self._get_source(ref) return self.cas.contains_directory(source_proto.files, with_files=True) # commit() # # Given a source along with previous sources, it stages and commits these # to the local CAS. This is done due to some types of sources being # dependent on previous sources, such as the patch source. # # Args: # source: last source # previous_sources: rest of the sources. def commit(self, source, previous_sources): ref = source._get_source_name() # Use tmpdir for now vdir = CasBasedDirectory(self.cas) for previous_source in previous_sources: vdir.import_files(self.export(previous_source)) if not source.BST_STAGE_VIRTUAL_DIRECTORY: with utils._tempdir(dir=self.context.tmpdir, prefix='staging-temp') as tmpdir: if not vdir.is_empty(): vdir.export_files(tmpdir) source._stage(tmpdir) vdir.import_files(tmpdir, can_link=True) else: source._stage(vdir) self._store_source(ref, vdir._get_digest()) # export() # # Exports a source in the CAS to a virtual directory # # Args: # source (Source): source we want to export # # Returns: # CASBasedDirectory def export(self, source): ref = source._get_source_name() source = self._get_source(ref) return CasBasedDirectory(self.cas, digest=source.files) # pull() # # Attempts to pull sources from configure remote source caches. # # Args: # source (Source): The source we want to fetch # progress (callable|None): The progress callback # # Returns: # (bool): True if pull successful, False if not def pull(self, source): ref = source._get_source_name() project = source._get_project() display_key = source._get_brief_display_key() index_remotes = self._index_remotes[project] storage_remotes = self._storage_remotes[project] # First fetch the source proto so we know what to pull source_proto = None for remote in index_remotes: try: remote.init() source.status("Pulling source {} <- {}".format(display_key, remote)) source_proto = self._pull_source(ref, remote) if source_proto is None: source.info("Remote source service ({}) does not have source {} cached".format( remote, display_key)) continue except CASError as e: raise SourceCacheError("Failed to pull source {}: {}".format( display_key, e)) from e if not source_proto: return False for remote in storage_remotes: try: remote.init() source.status("Pulling data for source {} <- {}".format(display_key, remote)) # Fetch source blobs self.cas._fetch_directory(remote, source_proto.files) required_blobs = self.cas.required_blobs_for_directory(source_proto.files) missing_blobs = self.cas.local_missing_blobs(required_blobs) missing_blobs = self.cas.fetch_blobs(remote, missing_blobs) if missing_blobs: source.info("Remote cas ({}) does not have source {} cached".format( remote, display_key)) continue source.info("Pulled source {} <- {}".format(display_key, remote)) return True except BlobNotFound as e: # Not all blobs are available on this remote source.info("Remote cas ({}) does not have blob {} cached".format(remote, e.blob)) continue except CASError as e: raise SourceCacheError("Failed to pull source {}: {}".format( display_key, e)) from e return False # push() # # Push a source to configured remote source caches # # Args: # source (Source): source to push # # Returns: # (Bool): whether it pushed to a remote source cache # def push(self, source): ref = source._get_source_name() project = source._get_project() index_remotes = [] storage_remotes = [] # find configured push remotes for this source if self._has_push_remotes: index_remotes = [r for r in self._index_remotes[project] if r.push] storage_remotes = [r for r in self._storage_remotes[project] if r.push] pushed_storage = False pushed_index = False display_key = source._get_brief_display_key() for remote in storage_remotes: remote.init() source.status("Pushing data for source {} -> {}".format(display_key, remote)) source_proto = self._get_source(ref) try: self.cas._send_directory(remote, source_proto.files) pushed_storage = True except CASRemoteError: source.info("Failed to push source files {} -> {}".format(display_key, remote)) continue for remote in index_remotes: remote.init() source.status("Pushing source {} -> {}".format(display_key, remote)) # check whether cache has files already if self._pull_source(ref, remote) is not None: source.info("Remote ({}) already has source {} cached" .format(remote, display_key)) continue if not self._push_source(ref, remote): source.info("Failed to push source metadata {} -> {}".format(display_key, remote)) continue source.info("Pushed source {} -> {}".format(display_key, remote)) pushed_index = True return pushed_index and pushed_storage def _remove_source(self, ref, *, defer_prune=False): return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune) def _store_source(self, ref, digest): source_proto = source_pb2.Source() source_proto.files.CopyFrom(digest) self._store_proto(source_proto, ref) def _store_proto(self, proto, ref): path = self._source_path(ref) os.makedirs(os.path.dirname(path), exist_ok=True) with utils.save_file_atomic(path, 'w+b') as f: f.write(proto.SerializeToString()) def _get_source(self, ref): path = self._source_path(ref) source_proto = source_pb2.Source() try: with open(path, 'r+b') as f: source_proto.ParseFromString(f.read()) return source_proto except FileNotFoundError as e: raise SourceCacheError("Attempted to access unavailable source: {}" .format(e)) from e def _source_path(self, ref): return os.path.join(self.sourcerefdir, ref) def _reachable_directories(self): for root, _, files in os.walk(self.sourcerefdir): for source_file in files: source = source_pb2.Source() with open(os.path.join(root, source_file), 'r+b') as f: source.ParseFromString(f.read()) yield source.files def _update_mtime(self, ref): try: os.utime(self._source_path(ref)) except FileNotFoundError as e: raise SourceCacheError("Couldn't find source: {}".format(ref)) from e def _pull_source(self, source_ref, remote): try: remote.init() response = remote.get_source(source_ref) self._store_proto(response, source_ref) return response except grpc.RpcError as e: if e.code() != grpc.StatusCode.NOT_FOUND: raise SourceCacheError("Failed to pull source: {}".format(e.details())) return None def _push_source(self, source_ref, remote): try: remote.init() source_proto = self._get_source(source_ref) return remote.update_source(source_ref, source_proto) except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: raise SourceCacheError("Failed to push source: {}".format(e.details())) return None