From 05899546f79d9604e1d6cd0ab718fdc670770579 Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Wed, 30 Mar 2016 21:55:19 +0000 Subject: Cancel now running jobs Change-Id: I28bdb73cb59196fe35b0e891490ffc95acfeb1b5 --- gear/controller.py | 15 ++++++++++++ gear/worker.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/gear/controller.py b/gear/controller.py index 83616d15..56022f08 100644 --- a/gear/controller.py +++ b/gear/controller.py @@ -198,6 +198,16 @@ class SingleBuildController(): self.requests_controller.mark_as_built(cache_key) +class CancelationClient(gear.Client): + def __init__(self): + super(CancelationClient, self).__init__() + + def handleWorkComplete(self, packet): + job = super(CancelationClient, self).handleWorkComplete(packet) + print "Cancelation workcomplete" + return job + + class CacheRequestClient(gear.Client): def __init__(self, controller): super(CacheRequestClient, self).__init__() @@ -280,6 +290,9 @@ class RequestsController(threading.Thread): self.build_requests = [] self.build_status_lock = threading.Lock() self.stopped = False + self.cancel_client = CancelationClient() + self.cancel_client.addServer('localhost') + self.cancel_client.waitForServer() def add_request(self, request, job=None): json_request = json.dumps(request) @@ -376,6 +389,8 @@ class RequestsController(threading.Thread): for cache_key in to_remove: del self.building_dict[cache_key] del self.jobs_dict[cache_key] + job = gear.Job("cancel-%s" % cache_key, cache_key) + self.cancel_client.submitJob(job) print "Finished cancelling" diff --git a/gear/worker.py b/gear/worker.py index 3b0f4591..9df3341f 100644 --- a/gear/worker.py +++ b/gear/worker.py @@ -81,9 +81,53 @@ def upload_files(cache_key, suffixes): sftp.put(single_file, remote_dest_tmp) sftp.rename(remote_dest_tmp, remote_dest) -class GearWorkerManager(threading.Thread): +class CancelationManager(threading.Thread): def __init__(self): + super(CancelationManager, self).__init__() + self.stopped = False + self.worker = gear.Worker('canceller') + self.worker.addServer('localhost') + self.active = False + + def set_cancel_worker(self, cache_key, callback): + self.cache_key = cache_key + self.callback = callback + print "DEBUG: Setting cancel-%s" % self.cache_key + self.worker.registerFunction("cancel-%s" % self.cache_key) + self.active = True + + def remove_cancel_worker(self): + print "DEBUG: Removing cancel-%s" % self.cache_key + if self.active: + self.worker.unRegisterFunction("cancel-%s" % self.cache_key) + self.active = False + + def cancel_job(self, cache_key): + if not cache_key == self.cache_key: + print "EERRRM.. this is not what I wanted" + + self.callback() + self.remove_cancel_worker() + + def shutdown(self): + self.stopped = True + self.worker.stopWaitingForJobs() + + def run(self): + while not self.stopped: + try: + print "DEBUG: Waiting for cancel job" + job = self.worker.getJob() + self.cancel_job(job.arguments) + job.sendWorkComplete() + except gear.InterruptedError: + pass + + +class GearWorkerManager(threading.Thread): + + def __init__(self, cancelation_manager): super(GearWorkerManager, self).__init__() self.stopped = False self.worker = gear.Worker('reverser') @@ -92,10 +136,12 @@ class GearWorkerManager(threading.Thread): self.worker.registerFunction("build-graph") self.worker.registerFunction("build-artifact") self.worker.registerFunction("cache-request") + self.cancelation_manager = cancelation_manager def shutdown(self): self.stopped = True self.worker.stopWaitingForJobs() + self.cancelation_manager.shutdown() def run(self): while not self.stopped: @@ -143,16 +189,28 @@ class GearWorkerManager(threading.Thread): elif job.name == "build-artifact": artifact = distbuild.decode_artifact_reference(job.arguments) print "building %s" % artifact.name + self.finished = False + + cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name] p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) + + def set_finished(): + print "DEBUG: Cancelling build" + self.finished = True + p.kill() + + cancelation_manager.set_cancel_worker(artifact.cache_key, set_finished) p.stdin.write(job.arguments) p.stdin.close() - while True: + while not self.finished: line = p.stdout.readline() if not line: break job.sendWorkData(line) p.wait() + cancelation_manager.remove_cancel_worker() if p.returncode != 0: + print "DEBUG: FAILED with returncode %s" % p.returncode raise BuildFailedError() kind = artifact.kind @@ -168,6 +226,7 @@ class GearWorkerManager(threading.Thread): if kind == 'stratum': suffixes.append(filename + '.meta') + upload_files(artifact.cache_key, suffixes) job.sendWorkComplete(artifact.cache_key) elif job.name == 'cache-request': @@ -186,7 +245,9 @@ def term_handler(signum, frame): worker_manager.shutdown() signal.signal(signal.SIGTERM, term_handler) -worker_manager = GearWorkerManager() +cancelation_manager = CancelationManager() +cancelation_manager.start() +worker_manager = GearWorkerManager(cancelation_manager) worker_manager.start() while not worker_manager.stopped: -- cgit v1.2.1