summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-30 21:55:19 +0000
committerPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-30 21:55:19 +0000
commit05899546f79d9604e1d6cd0ab718fdc670770579 (patch)
treeaefb0d750c9dba67a81cdfcd3d82987cdd6908cb
parentea91a14e56efe070c977e9ff2877a23f3c498e92 (diff)
downloadmorph-05899546f79d9604e1d6cd0ab718fdc670770579.tar.gz
Cancel now running jobs
Change-Id: I28bdb73cb59196fe35b0e891490ffc95acfeb1b5
-rw-r--r--gear/controller.py15
-rw-r--r--gear/worker.py67
2 files changed, 79 insertions, 3 deletions
diff --git a/gear/controller.py b/gear/controller.py
index 83616d15..56022f08 100644
--- a/gear/controller.py
+++ b/gear/controller.py
@@ -198,6 +198,16 @@ class SingleBuildController():
self.requests_controller.mark_as_built(cache_key)
+class CancelationClient(gear.Client):
+ def __init__(self):
+ super(CancelationClient, self).__init__()
+
+ def handleWorkComplete(self, packet):
+ job = super(CancelationClient, self).handleWorkComplete(packet)
+ print "Cancelation workcomplete"
+ return job
+
+
class CacheRequestClient(gear.Client):
def __init__(self, controller):
super(CacheRequestClient, self).__init__()
@@ -280,6 +290,9 @@ class RequestsController(threading.Thread):
self.build_requests = []
self.build_status_lock = threading.Lock()
self.stopped = False
+ self.cancel_client = CancelationClient()
+ self.cancel_client.addServer('localhost')
+ self.cancel_client.waitForServer()
def add_request(self, request, job=None):
json_request = json.dumps(request)
@@ -376,6 +389,8 @@ class RequestsController(threading.Thread):
for cache_key in to_remove:
del self.building_dict[cache_key]
del self.jobs_dict[cache_key]
+ job = gear.Job("cancel-%s" % cache_key, cache_key)
+ self.cancel_client.submitJob(job)
print "Finished cancelling"
diff --git a/gear/worker.py b/gear/worker.py
index 3b0f4591..9df3341f 100644
--- a/gear/worker.py
+++ b/gear/worker.py
@@ -81,9 +81,53 @@ def upload_files(cache_key, suffixes):
sftp.put(single_file, remote_dest_tmp)
sftp.rename(remote_dest_tmp, remote_dest)
-class GearWorkerManager(threading.Thread):
+class CancelationManager(threading.Thread):
def __init__(self):
+ super(CancelationManager, self).__init__()
+ self.stopped = False
+ self.worker = gear.Worker('canceller')
+ self.worker.addServer('localhost')
+ self.active = False
+
+ def set_cancel_worker(self, cache_key, callback):
+ self.cache_key = cache_key
+ self.callback = callback
+ print "DEBUG: Setting cancel-%s" % self.cache_key
+ self.worker.registerFunction("cancel-%s" % self.cache_key)
+ self.active = True
+
+ def remove_cancel_worker(self):
+ print "DEBUG: Removing cancel-%s" % self.cache_key
+ if self.active:
+ self.worker.unRegisterFunction("cancel-%s" % self.cache_key)
+ self.active = False
+
+ def cancel_job(self, cache_key):
+ if not cache_key == self.cache_key:
+ print "EERRRM.. this is not what I wanted"
+
+ self.callback()
+ self.remove_cancel_worker()
+
+ def shutdown(self):
+ self.stopped = True
+ self.worker.stopWaitingForJobs()
+
+ def run(self):
+ while not self.stopped:
+ try:
+ print "DEBUG: Waiting for cancel job"
+ job = self.worker.getJob()
+ self.cancel_job(job.arguments)
+ job.sendWorkComplete()
+ except gear.InterruptedError:
+ pass
+
+
+class GearWorkerManager(threading.Thread):
+
+ def __init__(self, cancelation_manager):
super(GearWorkerManager, self).__init__()
self.stopped = False
self.worker = gear.Worker('reverser')
@@ -92,10 +136,12 @@ class GearWorkerManager(threading.Thread):
self.worker.registerFunction("build-graph")
self.worker.registerFunction("build-artifact")
self.worker.registerFunction("cache-request")
+ self.cancelation_manager = cancelation_manager
def shutdown(self):
self.stopped = True
self.worker.stopWaitingForJobs()
+ self.cancelation_manager.shutdown()
def run(self):
while not self.stopped:
@@ -143,16 +189,28 @@ class GearWorkerManager(threading.Thread):
elif job.name == "build-artifact":
artifact = distbuild.decode_artifact_reference(job.arguments)
print "building %s" % artifact.name
+ self.finished = False
+
+
cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name]
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
+
+ def set_finished():
+ print "DEBUG: Cancelling build"
+ self.finished = True
+ p.kill()
+
+ cancelation_manager.set_cancel_worker(artifact.cache_key, set_finished)
p.stdin.write(job.arguments)
p.stdin.close()
- while True:
+ while not self.finished:
line = p.stdout.readline()
if not line: break
job.sendWorkData(line)
p.wait()
+ cancelation_manager.remove_cancel_worker()
if p.returncode != 0:
+ print "DEBUG: FAILED with returncode %s" % p.returncode
raise BuildFailedError()
kind = artifact.kind
@@ -168,6 +226,7 @@ class GearWorkerManager(threading.Thread):
if kind == 'stratum':
suffixes.append(filename + '.meta')
+
upload_files(artifact.cache_key, suffixes)
job.sendWorkComplete(artifact.cache_key)
elif job.name == 'cache-request':
@@ -186,7 +245,9 @@ def term_handler(signum, frame):
worker_manager.shutdown()
signal.signal(signal.SIGTERM, term_handler)
-worker_manager = GearWorkerManager()
+cancelation_manager = CancelationManager()
+cancelation_manager.start()
+worker_manager = GearWorkerManager(cancelation_manager)
worker_manager.start()
while not worker_manager.stopped: