diff options
Diffstat (limited to 'third_party/waf/waflib/extras/preforkunix.py')
-rw-r--r-- | third_party/waf/waflib/extras/preforkunix.py | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/third_party/waf/waflib/extras/preforkunix.py b/third_party/waf/waflib/extras/preforkunix.py new file mode 100644 index 00000000000..ec9aeeb10e7 --- /dev/null +++ b/third_party/waf/waflib/extras/preforkunix.py @@ -0,0 +1,317 @@ +#! /usr/bin/env python +# encoding: utf-8 +# Thomas Nagy, 2015 (ita) + +""" +A version of prefork.py that uses unix sockets. The advantage is that it does not expose +connections to the outside. Yet performance it only works on unix-like systems +and performance can be slightly worse. + +To use:: + + def options(opt): + # recommended, fork new processes before using more memory + opt.load('preforkunix') + + def build(bld): + bld.load('preforkunix') + ... + more code +""" + +import os, re, socket, threading, sys, subprocess, atexit, traceback, signal, time +try: + from queue import Queue +except ImportError: + from Queue import Queue +try: + import cPickle +except ImportError: + import pickle as cPickle + +HEADER_SIZE = 20 + +REQ = 'REQ' +RES = 'RES' +BYE = 'BYE' + +def make_header(params, cookie=''): + header = ','.join(params) + header = header.ljust(HEADER_SIZE - len(cookie)) + assert(len(header) == HEADER_SIZE - len(cookie)) + header = header + cookie + if sys.hexversion > 0x3000000: + header = header.encode('iso8859-1') + return header + +re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$') +if 1: + def send_response(conn, ret, out, err, exc): + if out or err or exc: + data = (out, err, exc) + data = cPickle.dumps(data, -1) + else: + data = '' + + params = [RES, str(ret), str(len(data))] + + # no need for the cookie in the response + conn.send(make_header(params)) + if data: + conn.send(data) + + def process_command(conn): + query = conn.recv(HEADER_SIZE) + if not query: + return None + #print(len(query)) + assert(len(query) == HEADER_SIZE) + if sys.hexversion > 0x3000000: + query = query.decode('iso8859-1') + + #print "%r" % query + if not re_valid_query.match(query): + send_response(conn, -1, '', '', 'Invalid query %r' % query) + raise ValueError('Invalid query %r' % query) + + query = query.strip().split(',') + + if query[0] == REQ: + run_command(conn, query[1:]) + elif query[0] == BYE: + raise ValueError('Exit') + else: + raise ValueError('Invalid query %r' % query) + return 'ok' + + def run_command(conn, query): + + size = int(query[0]) + data = conn.recv(size) + assert(len(data) == size) + kw = cPickle.loads(data) + + # run command + ret = out = err = exc = None + cmd = kw['cmd'] + del kw['cmd'] + #print(cmd) + + try: + if kw['stdout'] or kw['stderr']: + p = subprocess.Popen(cmd, **kw) + (out, err) = p.communicate() + ret = p.returncode + else: + ret = subprocess.Popen(cmd, **kw).wait() + except KeyboardInterrupt: + raise + except Exception as e: + ret = -1 + exc = str(e) + traceback.format_exc() + + send_response(conn, ret, out, err, exc) + +if 1: + + from waflib import Logs, Utils, Runner, Errors, Options + + def init_task_pool(self): + # lazy creation, and set a common pool for all task consumers + pool = self.pool = [] + for i in range(self.numjobs): + consumer = Runner.get_pool() + pool.append(consumer) + consumer.idx = i + self.ready = Queue(0) + def setq(consumer): + consumer.ready = self.ready + try: + threading.current_thread().idx = consumer.idx + except Exception as e: + print(e) + for x in pool: + x.ready.put(setq) + return pool + Runner.Parallel.init_task_pool = init_task_pool + + def make_conn(bld): + child_socket, parent_socket = socket.socketpair(socket.AF_UNIX) + ppid = os.getpid() + pid = os.fork() + if pid == 0: + parent_socket.close() + + # if the parent crashes, try to exit cleanly + def reap(): + while 1: + try: + os.kill(ppid, 0) + except OSError: + break + else: + time.sleep(1) + os.kill(os.getpid(), signal.SIGKILL) + t = threading.Thread(target=reap) + t.setDaemon(True) + t.start() + + # write to child_socket only + try: + while process_command(child_socket): + pass + except KeyboardInterrupt: + sys.exit(2) + else: + child_socket.close() + return (pid, parent_socket) + + SERVERS = [] + CONNS = [] + def close_all(): + global SERVERS, CONS + while CONNS: + conn = CONNS.pop() + try: + conn.close() + except: + pass + while SERVERS: + pid = SERVERS.pop() + try: + os.kill(pid, 9) + except: + pass + atexit.register(close_all) + + def put_data(conn, data): + cnt = 0 + while cnt < len(data): + sent = conn.send(data[cnt:]) + if sent == 0: + raise RuntimeError('connection ended') + cnt += sent + + def read_data(conn, siz): + cnt = 0 + buf = [] + while cnt < siz: + data = conn.recv(min(siz - cnt, 1024)) + if not data: + raise RuntimeError('connection ended %r %r' % (cnt, siz)) + buf.append(data) + cnt += len(data) + if sys.hexversion > 0x3000000: + ret = ''.encode('iso8859-1').join(buf) + else: + ret = ''.join(buf) + return ret + + def exec_command(self, cmd, **kw): + if 'stdout' in kw: + if kw['stdout'] not in (None, subprocess.PIPE): + return self.exec_command_old(cmd, **kw) + elif 'stderr' in kw: + if kw['stderr'] not in (None, subprocess.PIPE): + return self.exec_command_old(cmd, **kw) + + kw['shell'] = isinstance(cmd, str) + Logs.debug('runner: %r' % cmd) + Logs.debug('runner_env: kw=%s' % kw) + + if self.logger: + self.logger.info(cmd) + + if 'stdout' not in kw: + kw['stdout'] = subprocess.PIPE + if 'stderr' not in kw: + kw['stderr'] = subprocess.PIPE + + if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]): + raise Errors.WafError("Program %s not found!" % cmd[0]) + + idx = threading.current_thread().idx + kw['cmd'] = cmd + + # serialization.. + #print("sub %r %r" % (idx, cmd)) + #print("write to %r %r" % (idx, cmd)) + + data = cPickle.dumps(kw, -1) + params = [REQ, str(len(data))] + header = make_header(params) + + conn = CONNS[idx] + + put_data(conn, header + data) + + #print("running %r %r" % (idx, cmd)) + #print("read from %r %r" % (idx, cmd)) + + data = read_data(conn, HEADER_SIZE) + if sys.hexversion > 0x3000000: + data = data.decode('iso8859-1') + + #print("received %r" % data) + lst = data.split(',') + ret = int(lst[1]) + dlen = int(lst[2]) + + out = err = None + if dlen: + data = read_data(conn, dlen) + (out, err, exc) = cPickle.loads(data) + if exc: + raise Errors.WafError('Execution failure: %s' % exc) + + if out: + if not isinstance(out, str): + out = out.decode(sys.stdout.encoding or 'iso8859-1') + if self.logger: + self.logger.debug('out: %s' % out) + else: + Logs.info(out, extra={'stream':sys.stdout, 'c1': ''}) + if err: + if not isinstance(err, str): + err = err.decode(sys.stdout.encoding or 'iso8859-1') + if self.logger: + self.logger.error('err: %s' % err) + else: + Logs.info(err, extra={'stream':sys.stderr, 'c1': ''}) + + return ret + + def init_smp(self): + if not getattr(Options.options, 'smp', getattr(self, 'smp', None)): + return + if Utils.unversioned_sys_platform() in ('freebsd',): + pid = os.getpid() + cmd = ['cpuset', '-l', '0', '-p', str(pid)] + elif Utils.unversioned_sys_platform() in ('linux',): + pid = os.getpid() + cmd = ['taskset', '-pc', '0', str(pid)] + if cmd: + self.cmd_and_log(cmd, quiet=0) + + def options(opt): + # memory consumption might be at the lowest point while processing options + opt.add_option('--pin-process', action='store_true', dest='smp', default=False) + if Utils.is_win32 or os.sep != '/': + return + while len(CONNS) < 30: + (pid, conn) = make_conn(opt) + SERVERS.append(pid) + CONNS.append(conn) + + def build(bld): + if Utils.is_win32 or os.sep != '/': + return + if bld.cmd == 'clean': + return + while len(CONNS) < bld.jobs: + (pid, conn) = make_conn(bld) + SERVERS.append(pid) + CONNS.append(conn) + init_smp(bld) + bld.__class__.exec_command_old = bld.__class__.exec_command + bld.__class__.exec_command = exec_command |