diff options
Diffstat (limited to 'src/buildstream/_remote.py')
-rw-r--r-- | src/buildstream/_remote.py | 121 |
1 files changed, 86 insertions, 35 deletions
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py index 9761e8238..75c626c47 100644 --- a/src/buildstream/_remote.py +++ b/src/buildstream/_remote.py @@ -25,15 +25,29 @@ import grpc from . import _signals from . import utils -from ._exceptions import LoadError, LoadErrorReason, ImplError +from ._exceptions import LoadError, LoadErrorReason, ImplError, RemoteError from ._protos.google.bytestream import bytestream_pb2_grpc +from .types import FastEnum + + +# RemoteType(): +# +# Defines the different types of remote. +# +class RemoteType(FastEnum): + INDEX = "index" + STORAGE = "storage" + ALL = "all" + + def __str__(self): + return self.name.lower().replace('_', '-') # RemoteSpec(): # # Defines the basic structure of a remote specification. # -class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name')): +class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name type')): # new_from_config_node # @@ -51,7 +65,8 @@ class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key clien # @classmethod def new_from_config_node(cls, spec_node, basedir=None): - spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name']) + spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name', 'type']) + url = spec_node.get_str('url') if not url: provenance = spec_node.get_node('url').get_provenance() @@ -79,8 +94,9 @@ class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key clien raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance), LoadErrorReason.INVALID_DATA) - return cls(url, push, server_cert, client_key, client_cert, instance_name) + type_ = spec_node.get_enum('type', RemoteType, default=RemoteType.ALL) + return cls(url, push, server_cert, client_key, client_cert, instance_name, type_) # FIXME: This can be made much nicer in python 3.7 through the use of @@ -90,14 +106,15 @@ class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key clien # are considered mandatory. # # Disable type-checking since "Callable[...] has no attributes __defaults__" -RemoteSpec.__new__.__defaults__ = ( +RemoteSpec.__new__.__defaults__ = ( # type: ignore # mandatory # url - The url of the remote # mandatory # push - Whether the remote should be used for pushing None, # server_cert - The server certificate None, # client_key - The (private) client key None, # client_cert - The (public) client certificate - None # instance_name - The (grpc) instance name of the remote -) # type: ignore + None, # instance_name - The (grpc) instance name of the remote + RemoteType.ALL # type - The type of the remote (index, storage, both) +) # BaseRemote(): @@ -120,6 +137,10 @@ class BaseRemote(): self.bytestream = None self.channel = None + self.server_cert = None + self.client_key = None + self.client_cert = None + self.instance_name = spec.instance_name self.push = spec.push self.url = spec.url @@ -133,11 +154,6 @@ class BaseRemote(): if self._initialized: return - # gRPC doesn't support fork without exec, which is used in the - # main process. - if self.fork_allowed: - assert not utils._is_main_process() - # Set up the communcation channel url = urlparse(self.spec.url) if url.scheme == 'http': @@ -152,9 +168,12 @@ class BaseRemote(): self.spec.client_cert) except FileNotFoundError as e: raise RemoteError("Could not read certificates: {}".format(e)) from e - credentials = grpc.ssl_channel_credentials(root_certificates=server_cert, - private_key=client_key, - certificate_chain=client_cert) + self.server_cert = server_cert + self.client_key = client_key + self.client_cert = client_cert + credentials = grpc.ssl_channel_credentials(root_certificates=self.server_cert, + private_key=self.client_key, + certificate_chain=self.client_cert) self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) else: raise RemoteError("Unsupported URL: {}".format(self.spec.url)) @@ -166,6 +185,18 @@ class BaseRemote(): self._initialized = True + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False + + def close(self): + if self.channel: + self.channel.close() + self.channel = None + # _configure_protocols(): # # An abstract method to configure remote-specific protocols. This @@ -178,46 +209,66 @@ class BaseRemote(): def _configure_protocols(self): raise ImplError("An implementation of a Remote must configure its protocols.") - # check_remote + # check(): # - # Used when checking whether remote_specs work in the buildstream main - # thread, runs this in a seperate process to avoid creation of gRPC threads - # in the main BuildStream process - # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details - @classmethod - def check_remote(cls, remote_spec, q): + # Check if the remote is functional and has all the required + # capabilities. This should be used somewhat like an assertion, + # expecting a RemoteError. + # + # Note that this method runs the calls on a separate process, so + # that we can use grpc calls even if we are on the main process. + # + # Raises: + # RemoteError: If the grpc call fails. + # + def check(self): + queue = multiprocessing.Queue() + def __check_remote(): try: - remote = cls(remote_spec) - remote.init() - remote.check(q) + self.init() + queue.put(self._check()) except grpc.RpcError as e: # str(e) is too verbose for errors reported to the user - q.put(e.details()) + queue.put(e.details()) except Exception as e: # pylint: disable=broad-except # Whatever happens, we need to return it to the calling process # - q.put(str(e)) + queue.put(str(e)) - p = multiprocessing.Process(target=__check_remote) + process = multiprocessing.Process(target=__check_remote) try: # Keep SIGINT blocked in the child process with _signals.blocked([signal.SIGINT], ignore=False): - p.start() + process.start() - error = q.get() - p.join() + error = queue.get() + process.join() except KeyboardInterrupt: - utils._kill_process_tree(p.pid) + utils._kill_process_tree(process.pid) raise + finally: + # Should not be necessary, but let's avoid keeping them + # alive too long + queue.close() - return error + if error: + raise RemoteError(error) - def check(self, q): - q.put("An implementation of BaseCache should set a _check method") + # _check(): + # + # Check if this remote provides everything required for the + # particular kind of remote. This is expected to be called as part + # of check(), and must be called in a non-main process. + # + # Returns: + # (str|None): An error message, or None if no error message. + # + def _check(self): + return None def __str__(self): return self.url |