summaryrefslogtreecommitdiff
path: root/src/buildstream/_remote.py
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2019-08-22 17:48:34 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2019-09-06 15:55:10 +0100
commit47a3f93d9795be6af849c112d4180f0ad50ca23b (patch)
tree2d65dd2c24d9d6bd6795f0680811cf95ae3803e4 /src/buildstream/_remote.py
parente71621510de7c55aae4855f8bbb64eb2755346a8 (diff)
downloadbuildstream-47a3f93d9795be6af849c112d4180f0ad50ca23b.tar.gz
Allow splitting artifact caches
This is now split into storage/index remotes, where the former is expected to be a CASRemote and the latter a BuildStream-specific remote with the extensions required to store BuildStream artifact protos.
Diffstat (limited to 'src/buildstream/_remote.py')
-rw-r--r--src/buildstream/_remote.py121
1 files changed, 86 insertions, 35 deletions
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index 9761e8238..75c626c47 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -25,15 +25,29 @@ import grpc
from . import _signals
from . import utils
-from ._exceptions import LoadError, LoadErrorReason, ImplError
+from ._exceptions import LoadError, LoadErrorReason, ImplError, RemoteError
from ._protos.google.bytestream import bytestream_pb2_grpc
+from .types import FastEnum
+
+
+# RemoteType():
+#
+# Defines the different types of remote.
+#
+class RemoteType(FastEnum):
+ INDEX = "index"
+ STORAGE = "storage"
+ ALL = "all"
+
+ def __str__(self):
+ return self.name.lower().replace('_', '-')
# RemoteSpec():
#
# Defines the basic structure of a remote specification.
#
-class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name')):
+class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name type')):
# new_from_config_node
#
@@ -51,7 +65,8 @@ class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key clien
#
@classmethod
def new_from_config_node(cls, spec_node, basedir=None):
- spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name'])
+ spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name', 'type'])
+
url = spec_node.get_str('url')
if not url:
provenance = spec_node.get_node('url').get_provenance()
@@ -79,8 +94,9 @@ class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key clien
raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance),
LoadErrorReason.INVALID_DATA)
- return cls(url, push, server_cert, client_key, client_cert, instance_name)
+ type_ = spec_node.get_enum('type', RemoteType, default=RemoteType.ALL)
+ return cls(url, push, server_cert, client_key, client_cert, instance_name, type_)
# FIXME: This can be made much nicer in python 3.7 through the use of
@@ -90,14 +106,15 @@ class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key clien
# are considered mandatory.
#
# Disable type-checking since "Callable[...] has no attributes __defaults__"
-RemoteSpec.__new__.__defaults__ = (
+RemoteSpec.__new__.__defaults__ = ( # type: ignore
# mandatory # url - The url of the remote
# mandatory # push - Whether the remote should be used for pushing
None, # server_cert - The server certificate
None, # client_key - The (private) client key
None, # client_cert - The (public) client certificate
- None # instance_name - The (grpc) instance name of the remote
-) # type: ignore
+ None, # instance_name - The (grpc) instance name of the remote
+ RemoteType.ALL # type - The type of the remote (index, storage, both)
+)
# BaseRemote():
@@ -120,6 +137,10 @@ class BaseRemote():
self.bytestream = None
self.channel = None
+ self.server_cert = None
+ self.client_key = None
+ self.client_cert = None
+
self.instance_name = spec.instance_name
self.push = spec.push
self.url = spec.url
@@ -133,11 +154,6 @@ class BaseRemote():
if self._initialized:
return
- # gRPC doesn't support fork without exec, which is used in the
- # main process.
- if self.fork_allowed:
- assert not utils._is_main_process()
-
# Set up the communcation channel
url = urlparse(self.spec.url)
if url.scheme == 'http':
@@ -152,9 +168,12 @@ class BaseRemote():
self.spec.client_cert)
except FileNotFoundError as e:
raise RemoteError("Could not read certificates: {}".format(e)) from e
- credentials = grpc.ssl_channel_credentials(root_certificates=server_cert,
- private_key=client_key,
- certificate_chain=client_cert)
+ self.server_cert = server_cert
+ self.client_key = client_key
+ self.client_cert = client_cert
+ credentials = grpc.ssl_channel_credentials(root_certificates=self.server_cert,
+ private_key=self.client_key,
+ certificate_chain=self.client_cert)
self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
else:
raise RemoteError("Unsupported URL: {}".format(self.spec.url))
@@ -166,6 +185,18 @@ class BaseRemote():
self._initialized = True
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+ return False
+
+ def close(self):
+ if self.channel:
+ self.channel.close()
+ self.channel = None
+
# _configure_protocols():
#
# An abstract method to configure remote-specific protocols. This
@@ -178,46 +209,66 @@ class BaseRemote():
def _configure_protocols(self):
raise ImplError("An implementation of a Remote must configure its protocols.")
- # check_remote
+ # check():
#
- # Used when checking whether remote_specs work in the buildstream main
- # thread, runs this in a seperate process to avoid creation of gRPC threads
- # in the main BuildStream process
- # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
- @classmethod
- def check_remote(cls, remote_spec, q):
+ # Check if the remote is functional and has all the required
+ # capabilities. This should be used somewhat like an assertion,
+ # expecting a RemoteError.
+ #
+ # Note that this method runs the calls on a separate process, so
+ # that we can use grpc calls even if we are on the main process.
+ #
+ # Raises:
+ # RemoteError: If the grpc call fails.
+ #
+ def check(self):
+ queue = multiprocessing.Queue()
+
def __check_remote():
try:
- remote = cls(remote_spec)
- remote.init()
- remote.check(q)
+ self.init()
+ queue.put(self._check())
except grpc.RpcError as e:
# str(e) is too verbose for errors reported to the user
- q.put(e.details())
+ queue.put(e.details())
except Exception as e: # pylint: disable=broad-except
# Whatever happens, we need to return it to the calling process
#
- q.put(str(e))
+ queue.put(str(e))
- p = multiprocessing.Process(target=__check_remote)
+ process = multiprocessing.Process(target=__check_remote)
try:
# Keep SIGINT blocked in the child process
with _signals.blocked([signal.SIGINT], ignore=False):
- p.start()
+ process.start()
- error = q.get()
- p.join()
+ error = queue.get()
+ process.join()
except KeyboardInterrupt:
- utils._kill_process_tree(p.pid)
+ utils._kill_process_tree(process.pid)
raise
+ finally:
+ # Should not be necessary, but let's avoid keeping them
+ # alive too long
+ queue.close()
- return error
+ if error:
+ raise RemoteError(error)
- def check(self, q):
- q.put("An implementation of BaseCache should set a _check method")
+ # _check():
+ #
+ # Check if this remote provides everything required for the
+ # particular kind of remote. This is expected to be called as part
+ # of check(), and must be called in a non-main process.
+ #
+ # Returns:
+ # (str|None): An error message, or None if no error message.
+ #
+ def _check(self):
+ return None
def __str__(self):
return self.url