diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-08-09 13:15:14 +0100 |
---|---|---|
committer | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-09-06 14:32:21 +0100 |
commit | 412da4061d0d95513e3b4b13fc1c64a8c5d8cd21 (patch) | |
tree | d4932443f4df21d0c381bfaecca7314b4ff0f5f3 | |
parent | 0b73ddbbbca69c868252576be5b5364f664f550f (diff) | |
download | buildstream-412da4061d0d95513e3b4b13fc1c64a8c5d8cd21.tar.gz |
Create BaseRemote base class
-rw-r--r-- | src/buildstream/_remote.py | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py new file mode 100644 index 000000000..087b62dd6 --- /dev/null +++ b/src/buildstream/_remote.py @@ -0,0 +1,241 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# + +import multiprocessing +import os +import signal +from collections import namedtuple +from urllib.parse import urlparse + +import grpc + +from . import _signals +from . import utils +from ._exceptions import RemoteError, LoadError, LoadErrorReason, ImplError +from ._protos.google.bytestream import bytestream_pb2_grpc + + +# RemoteSpec(): +# +# Defines the basic structure of a remote specification. +# +class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name')): + + # new_from_config_node + # + # Creates a RemoteSpec() from a YAML loaded node. + # + # Args: + # spec_node (MappingNode): The configuration node describing the spec. + # basedir (str): The base directory from which to find certificates. + # + # Returns: + # (RemoteSpec) - The described RemoteSpec instance. + # + # Raises: + # LoadError: If the node is malformed. + # + @classmethod + def new_from_config_node(cls, spec_node, basedir=None): + spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name']) + url = spec_node.get_str('url') + if not url: + provenance = spec_node.get_node('url').get_provenance() + raise LoadError("{}: empty artifact cache URL".format(provenance), LoadErrorReason.INVALID_DATA) + + push = spec_node.get_bool('push', default=False) + instance_name = spec_node.get_str('instance-name', default=None) + + def parse_cert(key): + cert = spec_node.get_str(key, default=None) + if cert and basedir: + cert = os.path.join(basedir, cert) + return cert + + cert_keys = ('server-cert', 'client-key', 'client-cert') + server_cert, client_key, client_cert = tuple(parse_cert(key) for key in cert_keys) + + if client_key and not client_cert: + provenance = spec_node.get_node('client-key').get_provenance() + raise LoadError("{}: 'client-key' was specified without 'client-cert'".format(provenance), + LoadErrorReason.INVALID_DATA) + + if client_cert and not client_key: + provenance = spec_node.get_node('client-cert').get_provenance() + raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance), + LoadErrorReason.INVALID_DATA) + + return cls(url, push, server_cert, client_key, client_cert, instance_name) + + + +# FIXME: This can be made much nicer in python 3.7 through the use of +# defaults - or hell, by replacing it all with a typing.NamedTuple +# +# Note that defaults are specified from the right, and ommitted values +# are considered mandatory. +RemoteSpec.__new__.__defaults__ = ( + # mandatory # url - The url of the remote + # mandatory # push - Whether the remote should be used for pushing + None, # server_cert - The server certificate + None, # client_key - The (private) client key + None, # client_cert - The (public) client certificate + None # instance_name - The (grpc) instance name of the remote +) + + +# BaseRemote(): +# +# Provides the basic functionality required to set up remote +# interaction via GRPC. In particular, this will set up a +# grpc.insecure_channel, or a grpc.secure_channel, based on the given +# spec. +# +# Customization for the particular protocol is expected to be +# performed in children. +# +class BaseRemote(): + key_name = None + + def __init__(self, spec): + self.spec = spec + self._initialized = False + + self.bytestream = None + self.channel = None + + self.instance_name = spec.instance_name + self.push = spec.push + self.url = spec.url + + # init(): + # + # Initialize the given remote. This function must be called before + # any communication is performed, since such will otherwise fail. + # + def init(self): + if self._initialized: + return + + # gRPC doesn't support fork without exec, which is used in the + # main process. + if self.fork_allowed: + assert not utils._is_main_process() + + # Set up the communcation channel + url = urlparse(self.spec.url) + if url.scheme == 'http': + port = url.port or 80 + self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port)) + elif url.scheme == 'https': + port = url.port or 443 + try: + server_cert, client_key, client_cert = _read_files( + self.spec.server_cert, + self.spec.client_key, + self.spec.client_cert) + except FileNotFoundError as e: + raise RemoteError("Could not read certificates: {}".format(e)) from e + credentials = grpc.ssl_channel_credentials(root_certificates=server_cert, + private_key=client_key, + certificate_chain=client_cert) + self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials) + else: + raise RemoteError("Unsupported URL: {}".format(self.spec.url)) + + # Set up the bytestream on our channel + self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel) + + self._configure_protocols() + + self._initialized = True + + # _configure_protocols(): + # + # An abstract method to configure remote-specific protocols. This + # is *not* done as super().init() because we want to be able to + # set self._initialized *after* initialization completes in the + # parent class. + # + # This method should *never* be called outside of init(). + # + def _configure_protocols(self): + raise ImplError("An implementation of a Remote must configure its protocols.") + + # check_remote + # + # Used when checking whether remote_specs work in the buildstream main + # thread, runs this in a seperate process to avoid creation of gRPC threads + # in the main BuildStream process + # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details + @classmethod + def check_remote(cls, remote_spec, q): + def __check_remote(): + try: + remote = cls(remote_spec) + remote.init() + remote.check(q) + + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + q.put(e.details()) + + except Exception as e: # pylint: disable=broad-except + # Whatever happens, we need to return it to the calling process + # + q.put(str(e)) + + p = multiprocessing.Process(target=__check_remote) + + try: + # Keep SIGINT blocked in the child process + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + error = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + return error + + def check(self, q): + q.put("An implementation of BaseCache should set a _check method") + + def __str__(self): + return self.url + + +# _read_files(): +# +# A helper method to read a bunch of files, ignoring any input +# arguments that are None. +# +# Args: +# files (Iterable[str|None]): A list of files to read. Nones are passed back. +# +# Returns: +# Generator[str|None, None, None] - Strings read from those files. +# +def _read_files(*files): + def read_file(f): + if f: + with open(f, 'rb') as data: + return data.read() + return None + return (read_file(f) for f in files) |