summaryrefslogtreecommitdiff
path: root/buildstream/sandbox/_sandboxremote.py
blob: dbd4bbded92b2f653838ee3ea3aad202c225291f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
#!/usr/bin/env python3
#
#  Copyright (C) 2018 Bloomberg LP
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Authors:
#        Jim MacArthur <jim.macarthur@codethink.co.uk>

import os
import shlex
from collections import namedtuple
from urllib.parse import urlparse
from functools import partial

import grpc

from . import Sandbox, SandboxCommandError
from .sandbox import _SandboxBatch
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._exceptions import SandboxError
from .. import _yaml
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._artifactcache.cascache import CASRemote, CASRemoteSpec


class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service')):
    pass


# SandboxRemote()
#
# This isn't really a sandbox, it's a stub which sends all the sources and build
# commands to a remote server and retrieves the results from it.
#
class SandboxRemote(Sandbox):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        config = kwargs['specs']  # This should be a RemoteExecutionSpec
        if config is None:
            return

        self.storage_url = config.storage_service['url']
        self.exec_url = config.exec_service['url']

        self.storage_remote_spec = CASRemoteSpec(self.storage_url, push=True,
                                                 server_cert=config.storage_service['server-cert'],
                                                 client_key=config.storage_service['client-key'],
                                                 client_cert=config.storage_service['client-cert'])
        self.operation_name = None

    @staticmethod
    def specs_from_config_node(config_node, basedir):

        def require_node(config, keyname):
            val = config.get(keyname)
            if val is None:
                provenance = _yaml.node_get_provenance(remote_config, key=keyname)
                raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA,
                                      "{}: '{}' was not present in the remote "
                                      "execution configuration (remote-execution). "
                                      .format(str(provenance), keyname))
            return val

        remote_config = config_node.get("remote-execution", None)
        if remote_config is None:
            return None

        # Maintain some backwards compatibility with older configs, in which 'url' was the only valid key for
        # remote-execution.

        tls_keys = ['client-key', 'client-cert', 'server-cert']

        _yaml.node_validate(remote_config, ['execution-service', 'storage-service', 'url'])
        remote_exec_service_config = require_node(remote_config, 'execution-service')
        remote_exec_storage_config = require_node(remote_config, 'storage-service')

        _yaml.node_validate(remote_exec_service_config, ['url'])
        _yaml.node_validate(remote_exec_storage_config, ['url'] + tls_keys)

        if 'url' in remote_config:
            if 'execution-service' not in remote_config:
                remote_config['execution-service'] = {'url': remote_config['url']}
            else:
                provenance = _yaml.node_get_provenance(remote_config, key='url')
                raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA,
                                      "{}: 'url' and 'execution-service' keys were found in the remote "
                                      "execution configuration (remote-execution). "
                                      "You can only specify one of these."
                                      .format(str(provenance)))

        for key in tls_keys:
            if key not in remote_exec_storage_config:
                provenance = _yaml.node_get_provenance(remote_config, key='storage-service')
                raise _yaml.LoadError(_yaml.LoadErrorReason.INVALID_DATA,
                                      "{}: The keys {} are necessary for the storage-service section of "
                                      "remote-execution configuration. Your config is missing '{}'."
                                      .format(str(provenance), tls_keys, key))

        spec = RemoteExecutionSpec(remote_config['execution-service'], remote_config['storage-service'])
        return spec

    def run_remote_command(self, command, input_root_digest, working_directory, environment):
        # Sends an execution request to the remote execution server.
        #
        # This function blocks until it gets a response from the server.
        #
        environment_variables = [remote_execution_pb2.Command.
                                 EnvironmentVariable(name=k, value=v)
                                 for (k, v) in environment.items()]

        # Create and send the Command object.
        remote_command = remote_execution_pb2.Command(arguments=command,
                                                      working_directory=working_directory,
                                                      environment_variables=environment_variables,
                                                      output_files=[],
                                                      output_directories=[self._output_directory],
                                                      platform=None)
        context = self._get_context()
        cascache = context.get_cascache()
        casremote = CASRemote(self.storage_remote_spec)

        # Upload the Command message to the remote CAS server
        command_digest = cascache.push_message(casremote, remote_command)

        # Create and send the action.
        action = remote_execution_pb2.Action(command_digest=command_digest,
                                             input_root_digest=input_root_digest,
                                             timeout=None,
                                             do_not_cache=False)

        # Upload the Action message to the remote CAS server
        action_digest = cascache.push_message(casremote, action)

        # Next, try to create a communication channel to the BuildGrid server.
        url = urlparse(self.exec_url)
        if not url.port:
            raise SandboxError("You must supply a protocol and port number in the execution-service url, "
                               "for example: http://buildservice:50051.")
        if url.scheme == 'http':
            channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port))
        else:
            raise SandboxError("Remote execution currently only supports the 'http' protocol "
                               "and '{}' was supplied.".format(url.scheme))

        stub = remote_execution_pb2_grpc.ExecutionStub(channel)
        request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
                                                      skip_cache_lookup=False)

        def __run_remote_command(stub, execute_request=None, running_operation=None):
            try:
                last_operation = None
                if execute_request is not None:
                    operation_iterator = stub.Execute(execute_request)
                else:
                    request = remote_execution_pb2.WaitExecutionRequest(name=running_operation.name)
                    operation_iterator = stub.WaitExecution(request)

                for operation in operation_iterator:
                    if not self.operation_name:
                        self.operation_name = operation.name
                    if operation.done:
                        return operation
                    else:
                        last_operation = operation

            except grpc.RpcError as e:
                status_code = e.code()
                if status_code == grpc.StatusCode.UNAVAILABLE:
                    raise SandboxError("Failed contacting remote execution server at {}."
                                       .format(self.exec_url))

                elif status_code in (grpc.StatusCode.INVALID_ARGUMENT,
                                     grpc.StatusCode.FAILED_PRECONDITION,
                                     grpc.StatusCode.RESOURCE_EXHAUSTED,
                                     grpc.StatusCode.INTERNAL,
                                     grpc.StatusCode.DEADLINE_EXCEEDED):
                    raise SandboxError("{} ({}).".format(e.details(), status_code.name))

                elif running_operation and status_code == grpc.StatusCode.UNIMPLEMENTED:
                    raise SandboxError("Failed trying to recover from connection loss: "
                                       "server does not support operation status polling recovery.")

            return last_operation

        # Set up signal handler to trigger cancel_operation on SIGTERM
        operation = None
        with self._get_context().timed_activity("Waiting for the remote build to complete"), \
            _signals.terminator(partial(self.cancel_operation, channel)):
            operation = __run_remote_command(stub, execute_request=request)
            if operation is None:
                return None
            elif operation.done:
                return operation
            while operation is not None and not operation.done:
                operation = __run_remote_command(stub, running_operation=operation)

        return operation

    def cancel_operation(self, channel):
        # If we don't have the name can't send request.
        if self.operation_name is None:
            return

        stub = operations_pb2_grpc.OperationsStub(channel)
        request = operations_pb2.CancelOperationRequest(
            name=str(self.operation_name))

        try:
            stub.CancelOperation(request)
        except grpc.RpcError as e:
            if (e.code() == grpc.StatusCode.UNIMPLEMENTED or
                    e.code() == grpc.StatusCode.INVALID_ARGUMENT):
                pass
            else:
                raise SandboxError("Failed trying to send CancelOperation request: "
                                   "{} ({})".format(e.details(), e.code().name))

    def process_job_output(self, output_directories, output_files):
        # Reads the remote execution server response to an execution request.
        #
        # output_directories is an array of OutputDirectory objects.
        # output_files is an array of OutputFile objects.
        #
        # We only specify one output_directory, so it's an error
        # for there to be any output files or more than one directory at the moment.
        #
        if output_files:
            raise SandboxError("Output files were returned when we didn't request any.")
        elif not output_directories:
            error_text = "No output directory was returned from the build server."
            raise SandboxError(error_text)
        elif len(output_directories) > 1:
            error_text = "More than one output directory was returned from the build server: {}."
            raise SandboxError(error_text.format(output_directories))

        tree_digest = output_directories[0].tree_digest
        if tree_digest is None or not tree_digest.hash:
            raise SandboxError("Output directory structure had no digest attached.")

        context = self._get_context()
        cascache = context.get_cascache()
        casremote = CASRemote(self.storage_remote_spec)

        # Now do a pull to ensure we have the necessary parts.
        dir_digest = cascache.pull_tree(casremote, tree_digest)
        if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes:
            raise SandboxError("Output directory structure pulling from remote failed.")

        path_components = os.path.split(self._output_directory)

        # Now what we have is a digest for the output. Once we return, the calling process will
        # attempt to descend into our directory and find that directory, so we need to overwrite
        # that.

        if not path_components:
            # The artifact wants the whole directory; we could just return the returned hash in its
            # place, but we don't have a means to do that yet.
            raise SandboxError("Unimplemented: Output directory is empty or equal to the sandbox root.")

        # At the moment, we will get the whole directory back in the first directory argument and we need
        # to replace the sandbox's virtual directory with that. Creating a new virtual directory object
        # from another hash will be interesting, though...

        new_dir = CasBasedDirectory(self._get_context().artifactcache.cas, ref=dir_digest)
        self._set_virtual_directory(new_dir)

    def _run(self, command, flags, *, cwd, env):
        stdout, stderr = self._get_output()

        # Upload sources
        upload_vdir = self.get_virtual_directory()

        cascache = self._get_context().get_cascache()
        if isinstance(upload_vdir, FileBasedDirectory):
            # Make a new temporary directory to put source in
            upload_vdir = CasBasedDirectory(cascache, ref=None)
            upload_vdir.import_files(self.get_virtual_directory()._get_underlying_directory())

        upload_vdir.recalculate_hash()

        casremote = CASRemote(self.storage_remote_spec)
        # Now, push that key (without necessarily needing a ref) to the remote.
        try:
            cascache.push_directory(casremote, upload_vdir)
        except grpc.RpcError as e:
            raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e

        # Now transmit the command to execute
        operation = self.run_remote_command(command, upload_vdir.ref, cwd, env)

        if operation is None:
            # Failure of remote execution, usually due to an error in BuildStream
            raise SandboxError("No response returned from server")

        assert not operation.HasField('error') and operation.HasField('response')

        execution_response = remote_execution_pb2.ExecuteResponse()
        # The response is expected to be an ExecutionResponse message
        assert operation.response.Is(execution_response.DESCRIPTOR)

        operation.response.Unpack(execution_response)

        if execution_response.status.code != code_pb2.OK:
            # An unexpected error during execution: the remote execution
            # system failed at processing the execution request.
            if execution_response.status.message:
                raise SandboxError(execution_response.status.message)
            else:
                raise SandboxError("Remote server failed at executing the build request.")

        action_result = execution_response.result

        if stdout:
            if action_result.stdout_raw:
                stdout.write(str(action_result.stdout_raw, 'utf-8', errors='ignore'))
        if stderr:
            if action_result.stderr_raw:
                stderr.write(str(action_result.stderr_raw, 'utf-8', errors='ignore'))

        if action_result.exit_code != 0:
            # A normal error during the build: the remote execution system
            # has worked correctly but the command failed.
            return action_result.exit_code

        self.process_job_output(action_result.output_directories, action_result.output_files)

        return 0

    def _create_batch(self, main_group, flags, *, collect=None):
        return _SandboxRemoteBatch(self, main_group, flags, collect=collect)


# _SandboxRemoteBatch()
#
# Command batching by shell script generation.
#
class _SandboxRemoteBatch(_SandboxBatch):

    def __init__(self, sandbox, main_group, flags, *, collect=None):
        super().__init__(sandbox, main_group, flags, collect=collect)

        self.script = None
        self.first_command = None
        self.cwd = None
        self.env = None

    def execute(self):
        self.script = ""

        self.main_group.execute(self)

        first = self.first_command
        if first and self.sandbox.run(['sh', '-c', '-e', self.script], self.flags, cwd=first.cwd, env=first.env) != 0:
            raise SandboxCommandError("Command execution failed", collect=self.collect)

    def execute_group(self, group):
        group.execute_children(self)

    def execute_command(self, command):
        if self.first_command is None:
            # First command in batch
            # Initial working directory and environment of script already matches
            # the command configuration.
            self.first_command = command
        else:
            # Change working directory for this command
            if command.cwd != self.cwd:
                self.script += "mkdir -p {}\n".format(command.cwd)
                self.script += "cd {}\n".format(command.cwd)

            # Update environment for this command
            for key in self.env.keys():
                if key not in command.env:
                    self.script += "unset {}\n".format(key)
            for key, value in command.env.items():
                if key not in self.env or self.env[key] != value:
                    self.script += "export {}={}\n".format(key, shlex.quote(value))

        # Keep track of current working directory and environment
        self.cwd = command.cwd
        self.env = command.env

        # Actual command execution
        cmdline = ' '.join(shlex.quote(cmd) for cmd in command.command)
        self.script += "(set -ex; {})".format(cmdline)

        # Error handling
        label = command.label or cmdline
        quoted_label = shlex.quote("'{}'".format(label))
        self.script += " || (echo Command {} failed with exitcode $? >&2 ; exit 1)\n".format(quoted_label)

    def execute_call(self, call):
        raise SandboxError("SandboxRemote does not support callbacks in command batches")