import gear import time import json import os import requests import urlparse import signal import threading import time import cliapp from subprocess import Popen, PIPE, STDOUT import distbuild from contextlib import contextmanager import paramiko import logging logging.basicConfig() #TODO: values from settings cache_server = 'http://git.baserock.org:8080' class BuildFailedError(cliapp.AppException): def __init__(self): cliapp.AppException.__init__( self, 'Build Failed') class GraphFailedError(cliapp.AppException): def __init__(self): cliapp.AppException.__init__( self, 'Graph Failed') class CacheFailedError(cliapp.AppException): def __init__(self): cliapp.AppException.__init__( self, 'Cache Failed') @contextmanager def ssh_manager(host, port, username, key): ''' returns -> ssh connection ready to be used ''' # TODO: use an ssh key!!! It isn't working client = paramiko.client.SSHClient() client.load_host_keys(key) client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, port=port) try: yield client finally: client.close() def upload_files(cache_key, suffixes): print "DEBUG: start upload_files" cache_dir = '/src/cache/artifacts' remote_cache_dir = '/src/cache_server/' files_to_upload = [os.path.join(cache_dir, cache_key + '.' + suffix) for suffix in suffixes] with ssh_manager('127.0.0.1', 22, 'root', '/root/gerritbot/gerritbot_rsa') as client: sftp = client.open_sftp() for single_file in files_to_upload: remote_dest = os.path.join(remote_cache_dir, os.path.basename(single_file)) remote_dest_tmp = os.path.join(remote_cache_dir, os.path.basename(single_file) + '.tmp') print "DEBUG: going to upload %s" % single_file print "DEBUG: upload destination %s" % remote_dest try: sftp.stat(remote_dest) print "DEBUG: file already exists" return except IOError: print "DEBUG: file not found in cache, uploading" sftp.put(single_file, remote_dest_tmp) sftp.rename(remote_dest_tmp, remote_dest) class CancelationManager(threading.Thread): def __init__(self): super(CancelationManager, self).__init__() self.stopped = False self.worker = gear.Worker('canceller') self.worker.addServer('localhost') self.active = False def set_cancel_worker(self, cache_key, callback): self.cache_key = cache_key self.callback = callback print "DEBUG: Setting cancel-%s" % self.cache_key self.worker.registerFunction("cancel-%s" % self.cache_key) self.active = True def remove_cancel_worker(self): print "DEBUG: Removing cancel-%s" % self.cache_key if self.active: self.worker.unRegisterFunction("cancel-%s" % self.cache_key) self.active = False def cancel_job(self, cache_key): if not cache_key == self.cache_key: print "EERRRM.. this is not what I wanted" self.callback() self.remove_cancel_worker() def shutdown(self): self.stopped = True self.worker.stopWaitingForJobs() def run(self): while not self.stopped: try: print "DEBUG: Waiting for cancel job" job = self.worker.getJob() self.cancel_job(job.arguments) job.sendWorkComplete() except gear.InterruptedError: pass class GearWorkerManager(threading.Thread): def __init__(self, cancelation_manager): super(GearWorkerManager, self).__init__() self.stopped = False self.worker = gear.Worker('reverser') self.worker.addServer('localhost') self.worker.registerFunction("reverse") self.worker.registerFunction("build-graph") self.worker.registerFunction("build-artifact") self.worker.registerFunction("cache-request") self.cancelation_manager = cancelation_manager def shutdown(self): self.stopped = True self.worker.stopWaitingForJobs() self.cancelation_manager.shutdown() def run(self): while not self.stopped: try: print "DEBUG: Waiting for job" job = self.worker.getJob() self._handle_job(job) except gear.InterruptedError: print 'We were asked to stop waiting for jobs' except BuildFailedError: print "DEBUG: Build failed" pass except GraphFailedError: print "DEBUG: Graph failed" pass except CacheFailedError: print "DEBUG: Cache failed" pass def _handle_job(self, job): print "DEBUG: Received job '%s'" % job.name if job.name == "reverse": print "DEBUG: Starting job reverse with '%s'" % job.arguments for x in range(0, 100): job.sendWorkData("This is: %s" % x) job.sendWorkComplete("answer") elif job.name == "build-graph": bg_request=json.loads(job.arguments) print ("DEBUG: Starting build-graph calculation for Repo: '%s' " "Ref: '%s' System: '%s'") % (bg_request['repo'], bg_request['ref'], bg_request['system']) # TODO: There should be another way of doing this. cmd = ['morph', 'calculate-build-graph', '--quiet', bg_request['repo'], bg_request['ref'], bg_request['system']] p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) output = p.stdout.read() p.wait() if p.returncode != 0: raise GraphFailedError() # TODO: catch errors calculating build-graph here instead of sending # the error as build-graph :) print "DEBUG: finished computing build graph" job.sendWorkComplete(output) elif job.name == "build-artifact": artifact = distbuild.decode_artifact_reference(job.arguments) print "building %s" % artifact.name self.finished = False cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name] p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) def set_finished(): print "DEBUG: Cancelling build" self.finished = True p.kill() cancelation_manager.set_cancel_worker(artifact.cache_key, set_finished) p.stdin.write(job.arguments) p.stdin.close() while not self.finished: line = p.stdout.readline() if not line: break job.sendWorkData(line) p.wait() cancelation_manager.remove_cancel_worker() if p.returncode != 0: print "DEBUG: FAILED with returncode %s" % p.returncode raise BuildFailedError() kind = artifact.kind if kind == 'chunk': artifact_names = artifact.source_artifact_names suffixes = ['%s.%s' % (kind, name) for name in artifact_names] suffixes.append('build-log') else: filename = '%s.%s' % (kind, artifact.name) suffixes = [filename] if kind == 'stratum': suffixes.append(filename + '.meta') upload_files(artifact.cache_key, suffixes) job.sendWorkComplete(artifact.cache_key) elif job.name == 'cache-request': artifact_names = json.loads(job.arguments) url = urlparse.urljoin(cache_server, '/1.0/artifacts') try: r = requests.post(url, json=artifact_names) except requests.exceptions.RequestException: raise CacheFailedError() job.sendWorkComplete(json.dumps(r.json())) ## From command line gear-worker def term_handler(signum, frame): worker_manager.shutdown() signal.signal(signal.SIGTERM, term_handler) cancelation_manager = CancelationManager() cancelation_manager.start() worker_manager = GearWorkerManager(cancelation_manager) worker_manager.start() while not worker_manager.stopped: try: time.sleep(3) except KeyboardInterrupt: print "Ctrl + C: asking tasks to exit nicely...\n" worker_manager.shutdown()