From a426729a23d686d19b59d5e5a67342d0a2eb785c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 1 Jul 2019 15:25:52 +0100 Subject: casremote.py: Use buildbox-casd in init() --- src/buildstream/_basecache.py | 4 ++-- src/buildstream/_cas/casremote.py | 33 ++++++++++++++++++++++--------- src/buildstream/sandbox/_sandboxremote.py | 6 +++--- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 552d17b96..fc6087bf8 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -161,7 +161,7 @@ class BaseCache(): q = multiprocessing.Queue() for remote_spec in remote_specs: - error = self.remote_class.check_remote(remote_spec, q) + error = self.remote_class.check_remote(remote_spec, self.cas, q) if error and on_failure: on_failure(remote_spec.url, error) @@ -173,7 +173,7 @@ class BaseCache(): if remote_spec.push: self._has_push_remotes = True - remotes[remote_spec.url] = self.remote_class(remote_spec) + remotes[remote_spec.url] = self.remote_class(remote_spec, self.cas) for project in self.context.get_projects(): remote_specs = self.global_remote_specs diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 1c7e3152d..e1848cd20 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -11,6 +11,7 @@ import grpc from .._protos.google.rpc import code_pb2 from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc +from .._protos.build.buildgrid import local_cas_pb2 from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc from .._exceptions import CASRemoteError, LoadError, LoadErrorReason @@ -77,9 +78,10 @@ class BlobNotFound(CASRemoteError): # Represents a single remote CAS cache. # class CASRemote(): - def __init__(self, spec): + def __init__(self, spec, cascache): self.spec = spec self._initialized = False + self.cascache = cascache self.channel = None self.instance_name = None self.bytestream = None @@ -89,12 +91,17 @@ class CASRemote(): self.batch_read_supported = None self.capabilities = None self.max_batch_total_size_bytes = None + self.local_cas_instance_name = None def init(self): if not self._initialized: # gRPC doesn't support fork without exec, which is used in the main process. assert not utils._is_main_process() + server_cert_bytes = None + client_key_bytes = None + client_cert_bytes = None + url = urlparse(self.spec.url) if url.scheme == 'http': port = url.port or 80 @@ -105,20 +112,14 @@ class CASRemote(): if self.spec.server_cert: with open(self.spec.server_cert, 'rb') as f: server_cert_bytes = f.read() - else: - server_cert_bytes = None if self.spec.client_key: with open(self.spec.client_key, 'rb') as f: client_key_bytes = f.read() - else: - client_key_bytes = None if self.spec.client_cert: with open(self.spec.client_cert, 'rb') as f: client_cert_bytes = f.read() - else: - client_cert_bytes = None credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes, private_key=client_key_bytes, @@ -173,6 +174,20 @@ class CASRemote(): e.code() != grpc.StatusCode.PERMISSION_DENIED): raise + local_cas = self.cascache._get_local_cas() + request = local_cas_pb2.GetInstanceNameForRemoteRequest() + request.url = self.spec.url + if self.spec.instance_name: + request.instance_name = self.spec.instance_name + if server_cert_bytes: + request.server_cert = server_cert_bytes + if client_key_bytes: + request.client_key = client_key_bytes + if client_cert_bytes: + request.client_cert = client_cert_bytes + response = local_cas.GetInstanceNameForRemote(request) + self.local_cas_instance_name = response.instance_name + self._initialized = True # check_remote @@ -182,11 +197,11 @@ class CASRemote(): # in the main BuildStream process # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details @classmethod - def check_remote(cls, remote_spec, q): + def check_remote(cls, remote_spec, cascache, q): def __check_remote(): try: - remote = cls(remote_spec) + remote = cls(remote_spec, cascache) remote.init() request = buildstream_pb2.StatusRequest() diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index affe597dd..4308d662b 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -281,7 +281,7 @@ class SandboxRemote(Sandbox): context = self._get_context() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec) + casremote = CASRemote(self.storage_remote_spec, cascache) # Now do a pull to ensure we have the full directory structure. dir_digest = cascache.pull_tree(casremote, tree_digest) @@ -300,7 +300,7 @@ class SandboxRemote(Sandbox): project = self._get_project() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec) + casremote = CASRemote(self.storage_remote_spec, cascache) # Fetch the file blobs if needed if self._output_files_required or artifactcache.has_push_remotes(): @@ -368,7 +368,7 @@ class SandboxRemote(Sandbox): action_result = self._check_action_cache(action_digest) if not action_result: - casremote = CASRemote(self.storage_remote_spec) + casremote = CASRemote(self.storage_remote_spec, cascache) try: casremote.init() except grpc.RpcError as e: -- cgit v1.2.1