summaryrefslogtreecommitdiff
path: root/src/buildstream/_basecache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_basecache.py')
-rw-r--r--src/buildstream/_basecache.py150
1 files changed, 99 insertions, 51 deletions
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index 9197c91b0..df50bfb62 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -16,21 +16,23 @@
# Authors:
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
-import multiprocessing
import os
from fnmatch import fnmatch
+from itertools import chain
from typing import TYPE_CHECKING
from . import utils
from . import _yaml
from ._cas import CASRemote
from ._message import Message, MessageType
-from ._exceptions import LoadError
-from ._remote import RemoteSpec
+from ._exceptions import LoadError, RemoteError
+from ._remote import RemoteSpec, RemoteType
+
if TYPE_CHECKING:
from typing import Optional, Type
from ._exceptions import BstError
+ from ._remote import BaseRemote
# Base Cache for Caches to derive from
@@ -39,18 +41,20 @@ class BaseCache():
# None of these should ever be called in the base class, but this appeases
# pylint to some degree
- spec_name = None # type: Type[RemoteSpec]
- spec_error = None # type: Type[BstError]
- config_node_name = None # type: str
- remote_class = CASRemote # type: Type[CASRemote]
+ spec_name = None # type: str
+ spec_error = None # type: Type[BstError]
+ config_node_name = None # type: str
+ index_remote_class = None # type: Type[BaseRemote]
+ storage_remote_class = CASRemote # type: Type[BaseRemote]
def __init__(self, context):
self.context = context
self.cas = context.get_cascache()
self._remotes_setup = False # Check to prevent double-setup of remotes
- # Per-project list of _CASRemote instances.
- self._remotes = {}
+ # Per-project list of Remote instances.
+ self._storage_remotes = {}
+ self._index_remotes = {}
self.global_remote_specs = []
self.project_remote_specs = {}
@@ -64,7 +68,7 @@ class BaseCache():
# against fork() with open gRPC channels.
#
def has_open_grpc_channels(self):
- for project_remotes in self._remotes.values():
+ for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()):
for remote in project_remotes:
if remote.channel:
return True
@@ -76,7 +80,7 @@ class BaseCache():
#
def release_resources(self):
# Close all remotes and their gRPC channels
- for project_remotes in self._remotes.values():
+ for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()):
for remote in project_remotes:
remote.close()
@@ -157,7 +161,6 @@ class BaseCache():
# the user config in some cases (for example `bst artifact push --remote=...`).
has_remote_caches = False
if remote_url:
- # pylint: disable=not-callable
self._set_remotes([RemoteSpec(remote_url, push=True)])
has_remote_caches = True
if use_config:
@@ -169,6 +172,15 @@ class BaseCache():
if has_remote_caches:
self._initialize_remotes()
+ # Notify remotes that forking is disabled
+ def notify_fork_disabled(self):
+ for project in self._index_remotes:
+ for remote in self._index_remotes[project]:
+ remote.notify_fork_disabled()
+ for project in self._storage_remotes:
+ for remote in self._storage_remotes[project]:
+ remote.notify_fork_disabled()
+
# initialize_remotes():
#
# This will contact each remote cache.
@@ -177,7 +189,7 @@ class BaseCache():
# on_failure (callable): Called if we fail to contact one of the caches.
#
def initialize_remotes(self, *, on_failure=None):
- remotes = self._create_remote_instances(on_failure=on_failure)
+ index_remotes, storage_remotes = self._create_remote_instances(on_failure=on_failure)
# Assign remote instances to their respective projects
for project in self.context.get_projects():
@@ -188,21 +200,20 @@ class BaseCache():
remote_specs.extend(self.project_remote_specs[project])
# De-duplicate the list
- remote_specs = utils._deduplicate(remote_specs)
+ remote_specs = list(utils._deduplicate(remote_specs))
- project_remotes = []
+ def get_remotes(remote_list, remote_specs):
+ for remote_spec in remote_specs:
+ # If a remote_spec didn't make it into the remotes
+ # dict, that means we can't access it, and it has been
+ # disabled for this session.
+ if remote_spec not in remote_list:
+ continue
- for remote_spec in remote_specs:
- # If a remote_spec didn't make it into the remotes
- # dict, that means we can't access it, and it has been
- # disabled for this session.
- if remote_spec not in remotes:
- continue
+ yield remote_list[remote_spec]
- remote = remotes[remote_spec]
- project_remotes.append(remote)
-
- self._remotes[project] = project_remotes
+ self._index_remotes[project] = list(get_remotes(index_remotes, remote_specs))
+ self._storage_remotes[project] = list(get_remotes(storage_remotes, remote_specs))
# has_fetch_remotes():
#
@@ -222,8 +233,9 @@ class BaseCache():
return True
else:
# Check whether the specified element's project has fetch remotes
- remotes_for_project = self._remotes[plugin._get_project()]
- return bool(remotes_for_project)
+ index_remotes = self._index_remotes[plugin._get_project()]
+ storage_remotes = self._storage_remotes[plugin._get_project()]
+ return index_remotes and storage_remotes
# has_push_remotes():
#
@@ -243,8 +255,10 @@ class BaseCache():
return True
else:
# Check whether the specified element's project has push remotes
- remotes_for_project = self._remotes[plugin._get_project()]
- return any(remote.spec.push for remote in remotes_for_project)
+ index_remotes = self._index_remotes[plugin._get_project()]
+ storage_remotes = self._storage_remotes[plugin._get_project()]
+ return (any(remote.spec.push for remote in index_remotes) and
+ any(remote.spec.push for remote in storage_remotes))
################################################
# Local Private Methods #
@@ -261,8 +275,9 @@ class BaseCache():
# What do do when a remote doesn't respond.
#
# Returns:
- # (Dict[RemoteSpec, self.remote_class]) -
- # The created remote instances.
+ # (Dict[RemoteSpec, self.remote_class], Dict[RemoteSpec,
+ # self.remote_class]) -
+ # The created remote instances, index first, storage last.
#
def _create_remote_instances(self, *, on_failure=None):
# Create a flat list of all remote specs, global or
@@ -278,30 +293,63 @@ class BaseCache():
# Now let's create a dict of this, indexed by their specs, so
# that we can later assign them to the right projects.
- remotes = {}
- q = multiprocessing.Queue()
+ index_remotes = {}
+ storage_remotes = {}
for remote_spec in remote_specs:
- # First, let's check if the remote works
- error = self.remote_class.check_remote(remote_spec, self.cas, q)
-
- # If it doesn't, report the error in some way
- if error and on_failure:
- on_failure(remote_spec.url, error)
- continue
- elif error:
- raise self.spec_error(error) # pylint: disable=not-callable
-
- # If it does, we have fetch remotes, and potentially push remotes
- self._has_fetch_remotes = True
- if remote_spec.push:
- self._has_push_remotes = True
+ try:
+ index, storage = self._instantiate_remote(remote_spec)
+ except RemoteError as err:
+ if on_failure:
+ on_failure(remote_spec, str(err))
+ continue
+ else:
+ raise
# Finally, we can instantiate the remote. Note that
# NamedTuples are hashable, so we can use them as pretty
# low-overhead keys.
- remotes[remote_spec] = self.remote_class(remote_spec, self.cas)
+ if index:
+ index_remotes[remote_spec] = index
+ if storage:
+ storage_remotes[remote_spec] = storage
+
+ self._has_fetch_remotes = storage_remotes and index_remotes
+ self._has_push_remotes = (any(spec.push for spec in storage_remotes) and
+ any(spec.push for spec in index_remotes))
- return remotes
+ return index_remotes, storage_remotes
+
+ # _instantiate_remote()
+ #
+ # Instantiate a remote given its spec, asserting that it is
+ # reachable - this may produce two remote instances (a storage and
+ # an index remote as specified by the class variables).
+ #
+ # Args:
+ #
+ # remote_spec (RemoteSpec): The spec of the remote to
+ # instantiate.
+ #
+ # Returns:
+ #
+ # (Tuple[Remote|None, Remote|None]) - The remotes, index remote
+ # first, storage remote second. One must always be specified,
+ # the other may be None.
+ #
+ def _instantiate_remote(self, remote_spec):
+ # Our remotes can be index, storage or both. In either case,
+ # we need to use a different type of Remote for our calls, so
+ # we create two objects here
+ index = None
+ storage = None
+ if remote_spec.type in [RemoteType.INDEX, RemoteType.ALL]:
+ index = self.index_remote_class(remote_spec) # pylint: disable=not-callable
+ index.check()
+ if remote_spec.type in [RemoteType.STORAGE, RemoteType.ALL]:
+ storage = self.storage_remote_class(remote_spec, self.cas)
+ storage.check()
+
+ return (index, storage)
# _message()
#
@@ -334,8 +382,8 @@ class BaseCache():
# reports takes care of messaging
#
def _initialize_remotes(self):
- def remote_failed(url, error):
- self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
+ def remote_failed(remote, error):
+ self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(remote.url, error))
with self.context.messenger.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)