summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2018-10-24 18:09:47 +0100
committerJürg Billeter <j@bitron.ch>2018-11-20 16:32:10 +0000
commitfd41b0b5f1bef3b202d56790a0242b5249b552ee (patch)
treea8aed53df413fc3fd79913b7f540b28d33625775
parent8071c00c6fe886ac42b729f5979b2e466a84ce66 (diff)
downloadbuildstream-725-job-cancellation-on-remote-builds.tar.gz
_sandboxremote.py: Add sigterm handler that sends CancelOperation725-job-cancellation-on-remote-builds
-rw-r--r--buildstream/sandbox/_sandboxremote.py31
1 files changed, 29 insertions, 2 deletions
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index 389a8fec0..60a053e0c 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -20,15 +20,18 @@
import os
from urllib.parse import urlparse
+from functools import partial
import grpc
from . import Sandbox
from ..storage._filebaseddirectory import FileBasedDirectory
from ..storage._casbaseddirectory import CasBasedDirectory
+from .. import _signals
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._exceptions import SandboxError
+from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
# SandboxRemote()
@@ -51,6 +54,7 @@ class SandboxRemote(Sandbox):
"Only plain HTTP is currenlty supported (no HTTPS).")
self.server_url = '{}:{}'.format(url.hostname, url.port)
+ self.operation_name = None
def run_remote_command(self, command, input_root_digest, working_directory, environment):
# Sends an execution request to the remote execution server.
@@ -102,10 +106,13 @@ class SandboxRemote(Sandbox):
operation_iterator = stub.WaitExecution(request)
for operation in operation_iterator:
+ if not self.operation_name:
+ self.operation_name = operation.name
if operation.done:
return operation
else:
last_operation = operation
+
except grpc.RpcError as e:
status_code = e.code()
if status_code == grpc.StatusCode.UNAVAILABLE:
@@ -125,19 +132,39 @@ class SandboxRemote(Sandbox):
return last_operation
+ # Set up signal handler to trigger cancel_operation on SIGTERM
operation = None
- with self._get_context().timed_activity("Waiting for the remote build to complete"):
+ with self._get_context().timed_activity("Waiting for the remote build to complete"), \
+ _signals.terminator(partial(self.cancel_operation, channel)):
operation = __run_remote_command(stub, execute_request=request)
if operation is None:
return None
elif operation.done:
return operation
-
while operation is not None and not operation.done:
operation = __run_remote_command(stub, running_operation=operation)
return operation
+ def cancel_operation(self, channel):
+ # If we don't have the name can't send request.
+ if self.operation_name is None:
+ return
+
+ stub = operations_pb2_grpc.OperationsStub(channel)
+ request = operations_pb2.CancelOperationRequest(
+ name=str(self.operation_name))
+
+ try:
+ stub.CancelOperation(request)
+ except grpc.RpcError as e:
+ if (e.code() == grpc.StatusCode.UNIMPLEMENTED or
+ e.code() == grpc.StatusCode.INVALID_ARGUMENT):
+ pass
+ else:
+ raise SandboxError("Failed trying to send CancelOperation request: "
+ "{} ({})".format(e.details(), e.code().name))
+
def process_job_output(self, output_directories, output_files):
# Reads the remote execution server response to an execution request.
#