diff options
Diffstat (limited to 'third_party/waf/waflib/extras/prefork.py')
-rwxr-xr-x | third_party/waf/waflib/extras/prefork.py | 401 |
1 files changed, 401 insertions, 0 deletions
diff --git a/third_party/waf/waflib/extras/prefork.py b/third_party/waf/waflib/extras/prefork.py new file mode 100755 index 00000000000..b912c5b1b7c --- /dev/null +++ b/third_party/waf/waflib/extras/prefork.py @@ -0,0 +1,401 @@ +#! /usr/bin/env python +# encoding: utf-8 +# Thomas Nagy, 2015 (ita) + +""" +Execute commands through pre-forked servers. This tool creates as many servers as build threads. +On a benchmark executed on Linux Kubuntu 14, 8 virtual cores and SSD drive:: + + ./genbench.py /tmp/build 200 100 15 5 + waf clean build -j24 + # no prefork: 2m7.179s + # prefork: 0m55.400s + +To use:: + + def options(opt): + # optional, will spawn 40 servers early + opt.load('prefork') + + def build(bld): + bld.load('prefork') + ... + more code + +The servers and the build process are using a shared nonce to prevent undesirable external connections. +""" + +import os, re, socket, threading, sys, subprocess, time, atexit, traceback, random, signal +try: + import SocketServer +except ImportError: + import socketserver as SocketServer +try: + from queue import Queue +except ImportError: + from Queue import Queue +try: + import cPickle +except ImportError: + import pickle as cPickle + +SHARED_KEY = None +HEADER_SIZE = 64 + +REQ = 'REQ' +RES = 'RES' +BYE = 'BYE' + +def make_header(params, cookie=''): + header = ','.join(params) + header = header.ljust(HEADER_SIZE - len(cookie)) + assert(len(header) == HEADER_SIZE - len(cookie)) + header = header + cookie + if sys.hexversion > 0x3000000: + header = header.encode('iso8859-1') + return header + +def safe_compare(x, y): + sum = 0 + for (a, b) in zip(x, y): + sum |= ord(a) ^ ord(b) + return sum == 0 + +re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$') +class req(SocketServer.StreamRequestHandler): + def handle(self): + try: + while self.process_command(): + pass + except KeyboardInterrupt: + return + except Exception as e: + print(e) + + def send_response(self, ret, out, err, exc): + if out or err or exc: + data = (out, err, exc) + data = cPickle.dumps(data, -1) + else: + data = '' + + params = [RES, str(ret), str(len(data))] + + # no need for the cookie in the response + self.wfile.write(make_header(params)) + if data: + self.wfile.write(data) + self.wfile.flush() + + def process_command(self): + query = self.rfile.read(HEADER_SIZE) + if not query: + return None + #print(len(query)) + assert(len(query) == HEADER_SIZE) + if sys.hexversion > 0x3000000: + query = query.decode('iso8859-1') + + # magic cookie + key = query[-20:] + if not safe_compare(key, SHARED_KEY): + print('%r %r' % (key, SHARED_KEY)) + self.send_response(-1, '', '', 'Invalid key given!') + return 'meh' + + query = query[:-20] + #print "%r" % query + if not re_valid_query.match(query): + self.send_response(-1, '', '', 'Invalid query %r' % query) + raise ValueError('Invalid query %r' % query) + + query = query.strip().split(',') + + if query[0] == REQ: + self.run_command(query[1:]) + elif query[0] == BYE: + raise ValueError('Exit') + else: + raise ValueError('Invalid query %r' % query) + return 'ok' + + def run_command(self, query): + + size = int(query[0]) + data = self.rfile.read(size) + assert(len(data) == size) + kw = cPickle.loads(data) + + # run command + ret = out = err = exc = None + cmd = kw['cmd'] + del kw['cmd'] + #print(cmd) + + try: + if kw['stdout'] or kw['stderr']: + p = subprocess.Popen(cmd, **kw) + (out, err) = p.communicate() + ret = p.returncode + else: + ret = subprocess.Popen(cmd, **kw).wait() + except KeyboardInterrupt: + raise + except Exception as e: + ret = -1 + exc = str(e) + traceback.format_exc() + + self.send_response(ret, out, err, exc) + +def create_server(conn, cls): + # child processes do not need the key, so we remove it from the OS environment + global SHARED_KEY + SHARED_KEY = os.environ['SHARED_KEY'] + os.environ['SHARED_KEY'] = '' + + ppid = int(os.environ['PREFORKPID']) + def reap(): + if os.sep != '/': + os.waitpid(ppid, 0) + else: + while 1: + try: + os.kill(ppid, 0) + except OSError: + break + else: + time.sleep(1) + os.kill(os.getpid(), signal.SIGKILL) + t = threading.Thread(target=reap) + t.setDaemon(True) + t.start() + + server = SocketServer.TCPServer(conn, req) + print(server.server_address[1]) + sys.stdout.flush() + #server.timeout = 6000 # seconds + server.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + try: + server.serve_forever(poll_interval=0.001) + except KeyboardInterrupt: + pass + +if __name__ == '__main__': + conn = ("127.0.0.1", 0) + #print("listening - %r %r\n" % conn) + create_server(conn, req) +else: + + from waflib import Logs, Utils, Runner, Errors, Options + + def init_task_pool(self): + # lazy creation, and set a common pool for all task consumers + pool = self.pool = [] + for i in range(self.numjobs): + consumer = Runner.get_pool() + pool.append(consumer) + consumer.idx = i + self.ready = Queue(0) + def setq(consumer): + consumer.ready = self.ready + try: + threading.current_thread().idx = consumer.idx + except Exception as e: + print(e) + for x in pool: + x.ready.put(setq) + return pool + Runner.Parallel.init_task_pool = init_task_pool + + def make_server(bld, idx): + cmd = [sys.executable, os.path.abspath(__file__)] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) + return proc + + def make_conn(bld, srv): + port = srv.port + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + conn.connect(('127.0.0.1', port)) + return conn + + + SERVERS = [] + CONNS = [] + def close_all(): + global SERVERS, CONNS + while CONNS: + conn = CONNS.pop() + try: + conn.close() + except: + pass + while SERVERS: + srv = SERVERS.pop() + try: + srv.kill() + except: + pass + atexit.register(close_all) + + def put_data(conn, data): + cnt = 0 + while cnt < len(data): + sent = conn.send(data[cnt:]) + if sent == 0: + raise RuntimeError('connection ended') + cnt += sent + + def read_data(conn, siz): + cnt = 0 + buf = [] + while cnt < siz: + data = conn.recv(min(siz - cnt, 1024)) + if not data: + raise RuntimeError('connection ended %r %r' % (cnt, siz)) + buf.append(data) + cnt += len(data) + if sys.hexversion > 0x3000000: + ret = ''.encode('iso8859-1').join(buf) + else: + ret = ''.join(buf) + return ret + + def exec_command(self, cmd, **kw): + if 'stdout' in kw: + if kw['stdout'] not in (None, subprocess.PIPE): + return self.exec_command_old(cmd, **kw) + elif 'stderr' in kw: + if kw['stderr'] not in (None, subprocess.PIPE): + return self.exec_command_old(cmd, **kw) + + kw['shell'] = isinstance(cmd, str) + Logs.debug('runner: %r' % cmd) + Logs.debug('runner_env: kw=%s' % kw) + + if self.logger: + self.logger.info(cmd) + + if 'stdout' not in kw: + kw['stdout'] = subprocess.PIPE + if 'stderr' not in kw: + kw['stderr'] = subprocess.PIPE + + if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]): + raise Errors.WafError("Program %s not found!" % cmd[0]) + + idx = threading.current_thread().idx + kw['cmd'] = cmd + + # serialization.. + #print("sub %r %r" % (idx, cmd)) + #print("write to %r %r" % (idx, cmd)) + + data = cPickle.dumps(kw, -1) + params = [REQ, str(len(data))] + header = make_header(params, self.SHARED_KEY) + + conn = CONNS[idx] + + put_data(conn, header + data) + #put_data(conn, data) + + #print("running %r %r" % (idx, cmd)) + #print("read from %r %r" % (idx, cmd)) + + data = read_data(conn, HEADER_SIZE) + if sys.hexversion > 0x3000000: + data = data.decode('iso8859-1') + + #print("received %r" % data) + lst = data.split(',') + ret = int(lst[1]) + dlen = int(lst[2]) + + out = err = None + if dlen: + data = read_data(conn, dlen) + (out, err, exc) = cPickle.loads(data) + if exc: + raise Errors.WafError('Execution failure: %s' % exc) + + if out: + if not isinstance(out, str): + out = out.decode(sys.stdout.encoding or 'iso8859-1') + if self.logger: + self.logger.debug('out: %s' % out) + else: + Logs.info(out, extra={'stream':sys.stdout, 'c1': ''}) + if err: + if not isinstance(err, str): + err = err.decode(sys.stdout.encoding or 'iso8859-1') + if self.logger: + self.logger.error('err: %s' % err) + else: + Logs.info(err, extra={'stream':sys.stderr, 'c1': ''}) + + return ret + + def init_key(ctx): + try: + key = ctx.SHARED_KEY = os.environ['SHARED_KEY'] + except KeyError: + key = "".join([chr(random.SystemRandom().randint(40, 126)) for x in range(20)]) + os.environ['SHARED_KEY'] = ctx.SHARED_KEY = key + + os.environ['PREFORKPID'] = str(os.getpid()) + return key + + def init_servers(ctx, maxval): + while len(SERVERS) < maxval: + i = len(SERVERS) + srv = make_server(ctx, i) + SERVERS.append(srv) + while len(CONNS) < maxval: + i = len(CONNS) + srv = SERVERS[i] + + # postpone the connection + srv.port = int(srv.stdout.readline()) + + conn = None + for x in range(30): + try: + conn = make_conn(ctx, srv) + break + except socket.error: + time.sleep(0.01) + if not conn: + raise ValueError('Could not start the server!') + if srv.poll() is not None: + Logs.warn('Looks like it it not our server process - concurrent builds are unsupported at this stage') + raise ValueError('Could not start the server') + CONNS.append(conn) + + def init_smp(self): + if not getattr(Options.options, 'smp', getattr(self, 'smp', None)): + return + if Utils.unversioned_sys_platform() in ('freebsd',): + pid = os.getpid() + cmd = ['cpuset', '-l', '0', '-p', str(pid)] + elif Utils.unversioned_sys_platform() in ('linux',): + pid = os.getpid() + cmd = ['taskset', '-pc', '0', str(pid)] + if cmd: + self.cmd_and_log(cmd, quiet=0) + + def options(opt): + init_key(opt) + init_servers(opt, 40) + opt.add_option('--pin-process', action='store_true', dest='smp', default=False) + + def build(bld): + if bld.cmd == 'clean': + return + + init_key(bld) + init_servers(bld, bld.jobs) + init_smp(bld) + + bld.__class__.exec_command_old = bld.__class__.exec_command + bld.__class__.exec_command = exec_command |