import gear worker = gear.Worker('reverser') worker.addServer('localhost') worker.registerFunction("reverse") worker.registerFunction("build-graph") worker.registerFunction("build-artifact") worker.registerFunction("cache-request") import time import json import os import requests import urlparse from subprocess import Popen, PIPE, STDOUT import distbuild from contextlib import contextmanager import paramiko import logging logging.basicConfig() #TODO: values from settings cache_server = 'http://cache.baserock.org:8080' @contextmanager def ssh_manager(host, port, username, key): ''' returns -> ssh connection ready to be used ''' # TODO: use an ssh key!!! It isn't working client = paramiko.client.SSHClient() client.load_host_keys(key) client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, port=port) try: yield client finally: client.close() def upload_files(cache_key, suffixes): print "DEBUG: start upload_files" cache_dir = '/src/cache/artifacts' remote_cache_dir = '/src/cache_server/' files_to_upload = [os.path.join(cache_dir, cache_key + '.' + suffix) for suffix in suffixes] with ssh_manager('127.0.0.1', 22, 'root', '/root/gerritbot/gerritbot_rsa') as client: sftp = client.open_sftp() for single_file in files_to_upload: remote_dest = os.path.join(remote_cache_dir, os.path.basename(single_file)) remote_dest_tmp = os.path.join(remote_cache_dir, os.path.basename(single_file) + '.tmp') print "DEBUG: going to upload %s" % single_file print "DEBUG: upload destination %s" % remote_dest try: sftp.stat(remote_dest) print "DEBUG: file already exists" return except: print "DEBUG: file not found in cache, uploading" sftp.put(single_file, remote_dest_tmp) sftp.rename(remote_dest_tmp, remote_dest) while True: print "DEBUG: Waiting for job" job = worker.getJob() print "DEBUG: Received job '%s'" % job.name if job.name == "reverse": print "DEBUG: Starting job reverse with '%s'" % job.arguments for x in range(0, 100): job.sendWorkData("This is: %s" % x) job.sendWorkComplete("answer") elif job.name == "build-graph": bg_request=json.loads(job.arguments) print ("DEBUG: Starting build-graph calculation for Repo: '%s' " "Ref: '%s' System: '%s'") % (bg_request['repo'], bg_request['ref'], bg_request['system']) # TODO: There should be another way of doing this. cmd = ['morph', 'calculate-build-graph', '--quiet', bg_request['repo'], bg_request['ref'], bg_request['system']] p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) output = p.stdout.read() # TODO: catch errors calculating build-graph here instead of sending the error as build-graph :) print "DEBUG: finished computing build graph" job.sendWorkComplete(output) elif job.name == "build-artifact": artifact = distbuild.decode_artifact_reference(job.arguments) print "building %s" % artifact.name cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name] p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) p.stdin.write(job.arguments) p.stdin.close() while True: line = p.stdout.readline() if not line: break job.sendWorkData(line) kind = artifact.kind if kind == 'chunk': artifact_names = artifact.source_artifact_names suffixes = ['%s.%s' % (kind, name) for name in artifact_names] suffixes.append('build-log') else: filename = '%s.%s' % (kind, artifact.name) suffixes = [filename] if kind == 'stratum': suffixes.append(filename + '.meta') upload_files(artifact.cache_key, suffixes) job.sendWorkComplete(artifact.cache_key) elif job.name == 'cache-request': artifact_names = json.loads(job.arguments) url = urlparse.urljoin(cache_server, '/1.0/artifacts') r = requests.post(url, json=artifact_names) job.sendWorkComplete(json.dumps(r.json()))