summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2019-08-09 13:15:14 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2019-09-06 14:32:21 +0100
commit412da4061d0d95513e3b4b13fc1c64a8c5d8cd21 (patch)
treed4932443f4df21d0c381bfaecca7314b4ff0f5f3
parent0b73ddbbbca69c868252576be5b5364f664f550f (diff)
downloadbuildstream-412da4061d0d95513e3b4b13fc1c64a8c5d8cd21.tar.gz
Create BaseRemote base class
-rw-r--r--src/buildstream/_remote.py241
1 files changed, 241 insertions, 0 deletions
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
new file mode 100644
index 000000000..087b62dd6
--- /dev/null
+++ b/src/buildstream/_remote.py
@@ -0,0 +1,241 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import multiprocessing
+import os
+import signal
+from collections import namedtuple
+from urllib.parse import urlparse
+
+import grpc
+
+from . import _signals
+from . import utils
+from ._exceptions import RemoteError, LoadError, LoadErrorReason, ImplError
+from ._protos.google.bytestream import bytestream_pb2_grpc
+
+
+# RemoteSpec():
+#
+# Defines the basic structure of a remote specification.
+#
+class RemoteSpec(namedtuple('RemoteSpec', 'url push server_cert client_key client_cert instance_name')):
+
+ # new_from_config_node
+ #
+ # Creates a RemoteSpec() from a YAML loaded node.
+ #
+ # Args:
+ # spec_node (MappingNode): The configuration node describing the spec.
+ # basedir (str): The base directory from which to find certificates.
+ #
+ # Returns:
+ # (RemoteSpec) - The described RemoteSpec instance.
+ #
+ # Raises:
+ # LoadError: If the node is malformed.
+ #
+ @classmethod
+ def new_from_config_node(cls, spec_node, basedir=None):
+ spec_node.validate_keys(['url', 'push', 'server-cert', 'client-key', 'client-cert', 'instance-name'])
+ url = spec_node.get_str('url')
+ if not url:
+ provenance = spec_node.get_node('url').get_provenance()
+ raise LoadError("{}: empty artifact cache URL".format(provenance), LoadErrorReason.INVALID_DATA)
+
+ push = spec_node.get_bool('push', default=False)
+ instance_name = spec_node.get_str('instance-name', default=None)
+
+ def parse_cert(key):
+ cert = spec_node.get_str(key, default=None)
+ if cert and basedir:
+ cert = os.path.join(basedir, cert)
+ return cert
+
+ cert_keys = ('server-cert', 'client-key', 'client-cert')
+ server_cert, client_key, client_cert = tuple(parse_cert(key) for key in cert_keys)
+
+ if client_key and not client_cert:
+ provenance = spec_node.get_node('client-key').get_provenance()
+ raise LoadError("{}: 'client-key' was specified without 'client-cert'".format(provenance),
+ LoadErrorReason.INVALID_DATA)
+
+ if client_cert and not client_key:
+ provenance = spec_node.get_node('client-cert').get_provenance()
+ raise LoadError("{}: 'client-cert' was specified without 'client-key'".format(provenance),
+ LoadErrorReason.INVALID_DATA)
+
+ return cls(url, push, server_cert, client_key, client_cert, instance_name)
+
+
+
+# FIXME: This can be made much nicer in python 3.7 through the use of
+# defaults - or hell, by replacing it all with a typing.NamedTuple
+#
+# Note that defaults are specified from the right, and ommitted values
+# are considered mandatory.
+RemoteSpec.__new__.__defaults__ = (
+ # mandatory # url - The url of the remote
+ # mandatory # push - Whether the remote should be used for pushing
+ None, # server_cert - The server certificate
+ None, # client_key - The (private) client key
+ None, # client_cert - The (public) client certificate
+ None # instance_name - The (grpc) instance name of the remote
+)
+
+
+# BaseRemote():
+#
+# Provides the basic functionality required to set up remote
+# interaction via GRPC. In particular, this will set up a
+# grpc.insecure_channel, or a grpc.secure_channel, based on the given
+# spec.
+#
+# Customization for the particular protocol is expected to be
+# performed in children.
+#
+class BaseRemote():
+ key_name = None
+
+ def __init__(self, spec):
+ self.spec = spec
+ self._initialized = False
+
+ self.bytestream = None
+ self.channel = None
+
+ self.instance_name = spec.instance_name
+ self.push = spec.push
+ self.url = spec.url
+
+ # init():
+ #
+ # Initialize the given remote. This function must be called before
+ # any communication is performed, since such will otherwise fail.
+ #
+ def init(self):
+ if self._initialized:
+ return
+
+ # gRPC doesn't support fork without exec, which is used in the
+ # main process.
+ if self.fork_allowed:
+ assert not utils._is_main_process()
+
+ # Set up the communcation channel
+ url = urlparse(self.spec.url)
+ if url.scheme == 'http':
+ port = url.port or 80
+ self.channel = grpc.insecure_channel('{}:{}'.format(url.hostname, port))
+ elif url.scheme == 'https':
+ port = url.port or 443
+ try:
+ server_cert, client_key, client_cert = _read_files(
+ self.spec.server_cert,
+ self.spec.client_key,
+ self.spec.client_cert)
+ except FileNotFoundError as e:
+ raise RemoteError("Could not read certificates: {}".format(e)) from e
+ credentials = grpc.ssl_channel_credentials(root_certificates=server_cert,
+ private_key=client_key,
+ certificate_chain=client_cert)
+ self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
+ else:
+ raise RemoteError("Unsupported URL: {}".format(self.spec.url))
+
+ # Set up the bytestream on our channel
+ self.bytestream = bytestream_pb2_grpc.ByteStreamStub(self.channel)
+
+ self._configure_protocols()
+
+ self._initialized = True
+
+ # _configure_protocols():
+ #
+ # An abstract method to configure remote-specific protocols. This
+ # is *not* done as super().init() because we want to be able to
+ # set self._initialized *after* initialization completes in the
+ # parent class.
+ #
+ # This method should *never* be called outside of init().
+ #
+ def _configure_protocols(self):
+ raise ImplError("An implementation of a Remote must configure its protocols.")
+
+ # check_remote
+ #
+ # Used when checking whether remote_specs work in the buildstream main
+ # thread, runs this in a seperate process to avoid creation of gRPC threads
+ # in the main BuildStream process
+ # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+ @classmethod
+ def check_remote(cls, remote_spec, q):
+ def __check_remote():
+ try:
+ remote = cls(remote_spec)
+ remote.init()
+ remote.check(q)
+
+ except grpc.RpcError as e:
+ # str(e) is too verbose for errors reported to the user
+ q.put(e.details())
+
+ except Exception as e: # pylint: disable=broad-except
+ # Whatever happens, we need to return it to the calling process
+ #
+ q.put(str(e))
+
+ p = multiprocessing.Process(target=__check_remote)
+
+ try:
+ # Keep SIGINT blocked in the child process
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ p.start()
+
+ error = q.get()
+ p.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(p.pid)
+ raise
+
+ return error
+
+ def check(self, q):
+ q.put("An implementation of BaseCache should set a _check method")
+
+ def __str__(self):
+ return self.url
+
+
+# _read_files():
+#
+# A helper method to read a bunch of files, ignoring any input
+# arguments that are None.
+#
+# Args:
+# files (Iterable[str|None]): A list of files to read. Nones are passed back.
+#
+# Returns:
+# Generator[str|None, None, None] - Strings read from those files.
+#
+def _read_files(*files):
+ def read_file(f):
+ if f:
+ with open(f, 'rb') as data:
+ return data.read()
+ return None
+ return (read_file(f) for f in files)