diff options
Diffstat (limited to 'src/buildstream/_basecache.py')
-rw-r--r-- | src/buildstream/_basecache.py | 150 |
1 files changed, 99 insertions, 51 deletions
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 9197c91b0..df50bfb62 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -16,21 +16,23 @@ # Authors: # Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> # -import multiprocessing import os from fnmatch import fnmatch +from itertools import chain from typing import TYPE_CHECKING from . import utils from . import _yaml from ._cas import CASRemote from ._message import Message, MessageType -from ._exceptions import LoadError -from ._remote import RemoteSpec +from ._exceptions import LoadError, RemoteError +from ._remote import RemoteSpec, RemoteType + if TYPE_CHECKING: from typing import Optional, Type from ._exceptions import BstError + from ._remote import BaseRemote # Base Cache for Caches to derive from @@ -39,18 +41,20 @@ class BaseCache(): # None of these should ever be called in the base class, but this appeases # pylint to some degree - spec_name = None # type: Type[RemoteSpec] - spec_error = None # type: Type[BstError] - config_node_name = None # type: str - remote_class = CASRemote # type: Type[CASRemote] + spec_name = None # type: str + spec_error = None # type: Type[BstError] + config_node_name = None # type: str + index_remote_class = None # type: Type[BaseRemote] + storage_remote_class = CASRemote # type: Type[BaseRemote] def __init__(self, context): self.context = context self.cas = context.get_cascache() self._remotes_setup = False # Check to prevent double-setup of remotes - # Per-project list of _CASRemote instances. - self._remotes = {} + # Per-project list of Remote instances. + self._storage_remotes = {} + self._index_remotes = {} self.global_remote_specs = [] self.project_remote_specs = {} @@ -64,7 +68,7 @@ class BaseCache(): # against fork() with open gRPC channels. # def has_open_grpc_channels(self): - for project_remotes in self._remotes.values(): + for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()): for remote in project_remotes: if remote.channel: return True @@ -76,7 +80,7 @@ class BaseCache(): # def release_resources(self): # Close all remotes and their gRPC channels - for project_remotes in self._remotes.values(): + for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()): for remote in project_remotes: remote.close() @@ -157,7 +161,6 @@ class BaseCache(): # the user config in some cases (for example `bst artifact push --remote=...`). has_remote_caches = False if remote_url: - # pylint: disable=not-callable self._set_remotes([RemoteSpec(remote_url, push=True)]) has_remote_caches = True if use_config: @@ -169,6 +172,15 @@ class BaseCache(): if has_remote_caches: self._initialize_remotes() + # Notify remotes that forking is disabled + def notify_fork_disabled(self): + for project in self._index_remotes: + for remote in self._index_remotes[project]: + remote.notify_fork_disabled() + for project in self._storage_remotes: + for remote in self._storage_remotes[project]: + remote.notify_fork_disabled() + # initialize_remotes(): # # This will contact each remote cache. @@ -177,7 +189,7 @@ class BaseCache(): # on_failure (callable): Called if we fail to contact one of the caches. # def initialize_remotes(self, *, on_failure=None): - remotes = self._create_remote_instances(on_failure=on_failure) + index_remotes, storage_remotes = self._create_remote_instances(on_failure=on_failure) # Assign remote instances to their respective projects for project in self.context.get_projects(): @@ -188,21 +200,20 @@ class BaseCache(): remote_specs.extend(self.project_remote_specs[project]) # De-duplicate the list - remote_specs = utils._deduplicate(remote_specs) + remote_specs = list(utils._deduplicate(remote_specs)) - project_remotes = [] + def get_remotes(remote_list, remote_specs): + for remote_spec in remote_specs: + # If a remote_spec didn't make it into the remotes + # dict, that means we can't access it, and it has been + # disabled for this session. + if remote_spec not in remote_list: + continue - for remote_spec in remote_specs: - # If a remote_spec didn't make it into the remotes - # dict, that means we can't access it, and it has been - # disabled for this session. - if remote_spec not in remotes: - continue + yield remote_list[remote_spec] - remote = remotes[remote_spec] - project_remotes.append(remote) - - self._remotes[project] = project_remotes + self._index_remotes[project] = list(get_remotes(index_remotes, remote_specs)) + self._storage_remotes[project] = list(get_remotes(storage_remotes, remote_specs)) # has_fetch_remotes(): # @@ -222,8 +233,9 @@ class BaseCache(): return True else: # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[plugin._get_project()] - return bool(remotes_for_project) + index_remotes = self._index_remotes[plugin._get_project()] + storage_remotes = self._storage_remotes[plugin._get_project()] + return index_remotes and storage_remotes # has_push_remotes(): # @@ -243,8 +255,10 @@ class BaseCache(): return True else: # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[plugin._get_project()] - return any(remote.spec.push for remote in remotes_for_project) + index_remotes = self._index_remotes[plugin._get_project()] + storage_remotes = self._storage_remotes[plugin._get_project()] + return (any(remote.spec.push for remote in index_remotes) and + any(remote.spec.push for remote in storage_remotes)) ################################################ # Local Private Methods # @@ -261,8 +275,9 @@ class BaseCache(): # What do do when a remote doesn't respond. # # Returns: - # (Dict[RemoteSpec, self.remote_class]) - - # The created remote instances. + # (Dict[RemoteSpec, self.remote_class], Dict[RemoteSpec, + # self.remote_class]) - + # The created remote instances, index first, storage last. # def _create_remote_instances(self, *, on_failure=None): # Create a flat list of all remote specs, global or @@ -278,30 +293,63 @@ class BaseCache(): # Now let's create a dict of this, indexed by their specs, so # that we can later assign them to the right projects. - remotes = {} - q = multiprocessing.Queue() + index_remotes = {} + storage_remotes = {} for remote_spec in remote_specs: - # First, let's check if the remote works - error = self.remote_class.check_remote(remote_spec, self.cas, q) - - # If it doesn't, report the error in some way - if error and on_failure: - on_failure(remote_spec.url, error) - continue - elif error: - raise self.spec_error(error) # pylint: disable=not-callable - - # If it does, we have fetch remotes, and potentially push remotes - self._has_fetch_remotes = True - if remote_spec.push: - self._has_push_remotes = True + try: + index, storage = self._instantiate_remote(remote_spec) + except RemoteError as err: + if on_failure: + on_failure(remote_spec, str(err)) + continue + else: + raise # Finally, we can instantiate the remote. Note that # NamedTuples are hashable, so we can use them as pretty # low-overhead keys. - remotes[remote_spec] = self.remote_class(remote_spec, self.cas) + if index: + index_remotes[remote_spec] = index + if storage: + storage_remotes[remote_spec] = storage + + self._has_fetch_remotes = storage_remotes and index_remotes + self._has_push_remotes = (any(spec.push for spec in storage_remotes) and + any(spec.push for spec in index_remotes)) - return remotes + return index_remotes, storage_remotes + + # _instantiate_remote() + # + # Instantiate a remote given its spec, asserting that it is + # reachable - this may produce two remote instances (a storage and + # an index remote as specified by the class variables). + # + # Args: + # + # remote_spec (RemoteSpec): The spec of the remote to + # instantiate. + # + # Returns: + # + # (Tuple[Remote|None, Remote|None]) - The remotes, index remote + # first, storage remote second. One must always be specified, + # the other may be None. + # + def _instantiate_remote(self, remote_spec): + # Our remotes can be index, storage or both. In either case, + # we need to use a different type of Remote for our calls, so + # we create two objects here + index = None + storage = None + if remote_spec.type in [RemoteType.INDEX, RemoteType.ALL]: + index = self.index_remote_class(remote_spec) # pylint: disable=not-callable + index.check() + if remote_spec.type in [RemoteType.STORAGE, RemoteType.ALL]: + storage = self.storage_remote_class(remote_spec, self.cas) + storage.check() + + return (index, storage) # _message() # @@ -334,8 +382,8 @@ class BaseCache(): # reports takes care of messaging # def _initialize_remotes(self): - def remote_failed(url, error): - self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error)) + def remote_failed(remote, error): + self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(remote.url, error)) with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True): self.initialize_remotes(on_failure=remote_failed) |