From 8a8b0d7110841f602559d9511abf75da1bf74eed Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Mon, 28 Mar 2016 13:49:16 +0000 Subject: Worker, start handling errors, and ^C with threads Change-Id: I77271a4a480d3b41ec5148c8f5256a93c5cda363 --- gear/worker.py | 196 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 133 insertions(+), 63 deletions(-) diff --git a/gear/worker.py b/gear/worker.py index 824a7bba..3b0f4591 100644 --- a/gear/worker.py +++ b/gear/worker.py @@ -1,15 +1,13 @@ import gear -worker = gear.Worker('reverser') -worker.addServer('localhost') -worker.registerFunction("reverse") -worker.registerFunction("build-graph") -worker.registerFunction("build-artifact") -worker.registerFunction("cache-request") import time import json import os import requests import urlparse +import signal +import threading +import time +import cliapp from subprocess import Popen, PIPE, STDOUT @@ -23,6 +21,20 @@ logging.basicConfig() #TODO: values from settings cache_server = 'http://git.baserock.org:8080' +class BuildFailedError(cliapp.AppException): + def __init__(self): + cliapp.AppException.__init__( + self, 'Build Failed') + +class GraphFailedError(cliapp.AppException): + def __init__(self): + cliapp.AppException.__init__( + self, 'Graph Failed') + +class CacheFailedError(cliapp.AppException): + def __init__(self): + cliapp.AppException.__init__( + self, 'Cache Failed') @contextmanager def ssh_manager(host, port, username, key): @@ -41,6 +53,7 @@ def ssh_manager(host, port, username, key): finally: client.close() + def upload_files(cache_key, suffixes): print "DEBUG: start upload_files" cache_dir = '/src/cache/artifacts' @@ -68,60 +81,117 @@ def upload_files(cache_key, suffixes): sftp.put(single_file, remote_dest_tmp) sftp.rename(remote_dest_tmp, remote_dest) -while True: - print "DEBUG: Waiting for job" - job = worker.getJob() - print "DEBUG: Received job '%s'" % job.name - if job.name == "reverse": - print "DEBUG: Starting job reverse with '%s'" % job.arguments - for x in range(0, 100): - job.sendWorkData("This is: %s" % x) - job.sendWorkComplete("answer") - elif job.name == "build-graph": - bg_request=json.loads(job.arguments) - print ("DEBUG: Starting build-graph calculation for Repo: '%s' " - "Ref: '%s' System: '%s'") % (bg_request['repo'], - bg_request['ref'], - bg_request['system']) - # TODO: There should be another way of doing this. - cmd = ['morph', 'calculate-build-graph', '--quiet', - bg_request['repo'], bg_request['ref'], bg_request['system']] - p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) - output = p.stdout.read() - # TODO: catch errors calculating build-graph here instead of sending - # the error as build-graph :) - print "DEBUG: finished computing build graph" - job.sendWorkComplete(output) - elif job.name == "build-artifact": - artifact = distbuild.decode_artifact_reference(job.arguments) - print "building %s" % artifact.name - cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name] - p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) - p.stdin.write(job.arguments) - p.stdin.close() - while True: - line = p.stdout.readline() - if not line: break - job.sendWorkData(line) - - kind = artifact.kind - - if kind == 'chunk': - artifact_names = artifact.source_artifact_names - - suffixes = ['%s.%s' % (kind, name) for name in artifact_names] - suffixes.append('build-log') - else: - filename = '%s.%s' % (kind, artifact.name) - suffixes = [filename] - - if kind == 'stratum': - suffixes.append(filename + '.meta') - upload_files(artifact.cache_key, suffixes) - job.sendWorkComplete(artifact.cache_key) - elif job.name == 'cache-request': - artifact_names = json.loads(job.arguments) - - url = urlparse.urljoin(cache_server, '/1.0/artifacts') - r = requests.post(url, json=artifact_names) - job.sendWorkComplete(json.dumps(r.json())) +class GearWorkerManager(threading.Thread): + + def __init__(self): + super(GearWorkerManager, self).__init__() + self.stopped = False + self.worker = gear.Worker('reverser') + self.worker.addServer('localhost') + self.worker.registerFunction("reverse") + self.worker.registerFunction("build-graph") + self.worker.registerFunction("build-artifact") + self.worker.registerFunction("cache-request") + + def shutdown(self): + self.stopped = True + self.worker.stopWaitingForJobs() + + def run(self): + while not self.stopped: + try: + print "DEBUG: Waiting for job" + job = self.worker.getJob() + self._handle_job(job) + except gear.InterruptedError: + print 'We were asked to stop waiting for jobs' + except BuildFailedError: + print "DEBUG: Build failed" + pass + except GraphFailedError: + print "DEBUG: Graph failed" + pass + except CacheFailedError: + print "DEBUG: Cache failed" + pass + + def _handle_job(self, job): + print "DEBUG: Received job '%s'" % job.name + if job.name == "reverse": + print "DEBUG: Starting job reverse with '%s'" % job.arguments + for x in range(0, 100): + job.sendWorkData("This is: %s" % x) + job.sendWorkComplete("answer") + elif job.name == "build-graph": + bg_request=json.loads(job.arguments) + print ("DEBUG: Starting build-graph calculation for Repo: '%s' " + "Ref: '%s' System: '%s'") % (bg_request['repo'], + bg_request['ref'], + bg_request['system']) + # TODO: There should be another way of doing this. + cmd = ['morph', 'calculate-build-graph', '--quiet', + bg_request['repo'], bg_request['ref'], bg_request['system']] + p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) + output = p.stdout.read() + p.wait() + if p.returncode != 0: + raise GraphFailedError() + # TODO: catch errors calculating build-graph here instead of sending + # the error as build-graph :) + print "DEBUG: finished computing build graph" + job.sendWorkComplete(output) + elif job.name == "build-artifact": + artifact = distbuild.decode_artifact_reference(job.arguments) + print "building %s" % artifact.name + cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name] + p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) + p.stdin.write(job.arguments) + p.stdin.close() + while True: + line = p.stdout.readline() + if not line: break + job.sendWorkData(line) + p.wait() + if p.returncode != 0: + raise BuildFailedError() + + kind = artifact.kind + + if kind == 'chunk': + artifact_names = artifact.source_artifact_names + + suffixes = ['%s.%s' % (kind, name) for name in artifact_names] + suffixes.append('build-log') + else: + filename = '%s.%s' % (kind, artifact.name) + suffixes = [filename] + + if kind == 'stratum': + suffixes.append(filename + '.meta') + upload_files(artifact.cache_key, suffixes) + job.sendWorkComplete(artifact.cache_key) + elif job.name == 'cache-request': + artifact_names = json.loads(job.arguments) + + url = urlparse.urljoin(cache_server, '/1.0/artifacts') + try: + r = requests.post(url, json=artifact_names) + except requests.exceptions.RequestException: + raise CacheFailedError() + job.sendWorkComplete(json.dumps(r.json())) + +## From command line gear-worker + +def term_handler(signum, frame): + worker_manager.shutdown() +signal.signal(signal.SIGTERM, term_handler) + +worker_manager = GearWorkerManager() +worker_manager.start() + +while not worker_manager.stopped: + try: + time.sleep(3) + except KeyboardInterrupt: + print "Ctrl + C: asking tasks to exit nicely...\n" + worker_manager.shutdown() -- cgit v1.2.1