diff options
Diffstat (limited to 'third_party/waf/waflib/extras/preforkunix.py')
-rw-r--r-- | third_party/waf/waflib/extras/preforkunix.py | 317 |
1 files changed, 0 insertions, 317 deletions
diff --git a/third_party/waf/waflib/extras/preforkunix.py b/third_party/waf/waflib/extras/preforkunix.py deleted file mode 100644 index ec9aeeb10e7..00000000000 --- a/third_party/waf/waflib/extras/preforkunix.py +++ /dev/null @@ -1,317 +0,0 @@ -#! /usr/bin/env python -# encoding: utf-8 -# Thomas Nagy, 2015 (ita) - -""" -A version of prefork.py that uses unix sockets. The advantage is that it does not expose -connections to the outside. Yet performance it only works on unix-like systems -and performance can be slightly worse. - -To use:: - - def options(opt): - # recommended, fork new processes before using more memory - opt.load('preforkunix') - - def build(bld): - bld.load('preforkunix') - ... - more code -""" - -import os, re, socket, threading, sys, subprocess, atexit, traceback, signal, time -try: - from queue import Queue -except ImportError: - from Queue import Queue -try: - import cPickle -except ImportError: - import pickle as cPickle - -HEADER_SIZE = 20 - -REQ = 'REQ' -RES = 'RES' -BYE = 'BYE' - -def make_header(params, cookie=''): - header = ','.join(params) - header = header.ljust(HEADER_SIZE - len(cookie)) - assert(len(header) == HEADER_SIZE - len(cookie)) - header = header + cookie - if sys.hexversion > 0x3000000: - header = header.encode('iso8859-1') - return header - -re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$') -if 1: - def send_response(conn, ret, out, err, exc): - if out or err or exc: - data = (out, err, exc) - data = cPickle.dumps(data, -1) - else: - data = '' - - params = [RES, str(ret), str(len(data))] - - # no need for the cookie in the response - conn.send(make_header(params)) - if data: - conn.send(data) - - def process_command(conn): - query = conn.recv(HEADER_SIZE) - if not query: - return None - #print(len(query)) - assert(len(query) == HEADER_SIZE) - if sys.hexversion > 0x3000000: - query = query.decode('iso8859-1') - - #print "%r" % query - if not re_valid_query.match(query): - send_response(conn, -1, '', '', 'Invalid query %r' % query) - raise ValueError('Invalid query %r' % query) - - query = query.strip().split(',') - - if query[0] == REQ: - run_command(conn, query[1:]) - elif query[0] == BYE: - raise ValueError('Exit') - else: - raise ValueError('Invalid query %r' % query) - return 'ok' - - def run_command(conn, query): - - size = int(query[0]) - data = conn.recv(size) - assert(len(data) == size) - kw = cPickle.loads(data) - - # run command - ret = out = err = exc = None - cmd = kw['cmd'] - del kw['cmd'] - #print(cmd) - - try: - if kw['stdout'] or kw['stderr']: - p = subprocess.Popen(cmd, **kw) - (out, err) = p.communicate() - ret = p.returncode - else: - ret = subprocess.Popen(cmd, **kw).wait() - except KeyboardInterrupt: - raise - except Exception as e: - ret = -1 - exc = str(e) + traceback.format_exc() - - send_response(conn, ret, out, err, exc) - -if 1: - - from waflib import Logs, Utils, Runner, Errors, Options - - def init_task_pool(self): - # lazy creation, and set a common pool for all task consumers - pool = self.pool = [] - for i in range(self.numjobs): - consumer = Runner.get_pool() - pool.append(consumer) - consumer.idx = i - self.ready = Queue(0) - def setq(consumer): - consumer.ready = self.ready - try: - threading.current_thread().idx = consumer.idx - except Exception as e: - print(e) - for x in pool: - x.ready.put(setq) - return pool - Runner.Parallel.init_task_pool = init_task_pool - - def make_conn(bld): - child_socket, parent_socket = socket.socketpair(socket.AF_UNIX) - ppid = os.getpid() - pid = os.fork() - if pid == 0: - parent_socket.close() - - # if the parent crashes, try to exit cleanly - def reap(): - while 1: - try: - os.kill(ppid, 0) - except OSError: - break - else: - time.sleep(1) - os.kill(os.getpid(), signal.SIGKILL) - t = threading.Thread(target=reap) - t.setDaemon(True) - t.start() - - # write to child_socket only - try: - while process_command(child_socket): - pass - except KeyboardInterrupt: - sys.exit(2) - else: - child_socket.close() - return (pid, parent_socket) - - SERVERS = [] - CONNS = [] - def close_all(): - global SERVERS, CONS - while CONNS: - conn = CONNS.pop() - try: - conn.close() - except: - pass - while SERVERS: - pid = SERVERS.pop() - try: - os.kill(pid, 9) - except: - pass - atexit.register(close_all) - - def put_data(conn, data): - cnt = 0 - while cnt < len(data): - sent = conn.send(data[cnt:]) - if sent == 0: - raise RuntimeError('connection ended') - cnt += sent - - def read_data(conn, siz): - cnt = 0 - buf = [] - while cnt < siz: - data = conn.recv(min(siz - cnt, 1024)) - if not data: - raise RuntimeError('connection ended %r %r' % (cnt, siz)) - buf.append(data) - cnt += len(data) - if sys.hexversion > 0x3000000: - ret = ''.encode('iso8859-1').join(buf) - else: - ret = ''.join(buf) - return ret - - def exec_command(self, cmd, **kw): - if 'stdout' in kw: - if kw['stdout'] not in (None, subprocess.PIPE): - return self.exec_command_old(cmd, **kw) - elif 'stderr' in kw: - if kw['stderr'] not in (None, subprocess.PIPE): - return self.exec_command_old(cmd, **kw) - - kw['shell'] = isinstance(cmd, str) - Logs.debug('runner: %r' % cmd) - Logs.debug('runner_env: kw=%s' % kw) - - if self.logger: - self.logger.info(cmd) - - if 'stdout' not in kw: - kw['stdout'] = subprocess.PIPE - if 'stderr' not in kw: - kw['stderr'] = subprocess.PIPE - - if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]): - raise Errors.WafError("Program %s not found!" % cmd[0]) - - idx = threading.current_thread().idx - kw['cmd'] = cmd - - # serialization.. - #print("sub %r %r" % (idx, cmd)) - #print("write to %r %r" % (idx, cmd)) - - data = cPickle.dumps(kw, -1) - params = [REQ, str(len(data))] - header = make_header(params) - - conn = CONNS[idx] - - put_data(conn, header + data) - - #print("running %r %r" % (idx, cmd)) - #print("read from %r %r" % (idx, cmd)) - - data = read_data(conn, HEADER_SIZE) - if sys.hexversion > 0x3000000: - data = data.decode('iso8859-1') - - #print("received %r" % data) - lst = data.split(',') - ret = int(lst[1]) - dlen = int(lst[2]) - - out = err = None - if dlen: - data = read_data(conn, dlen) - (out, err, exc) = cPickle.loads(data) - if exc: - raise Errors.WafError('Execution failure: %s' % exc) - - if out: - if not isinstance(out, str): - out = out.decode(sys.stdout.encoding or 'iso8859-1') - if self.logger: - self.logger.debug('out: %s' % out) - else: - Logs.info(out, extra={'stream':sys.stdout, 'c1': ''}) - if err: - if not isinstance(err, str): - err = err.decode(sys.stdout.encoding or 'iso8859-1') - if self.logger: - self.logger.error('err: %s' % err) - else: - Logs.info(err, extra={'stream':sys.stderr, 'c1': ''}) - - return ret - - def init_smp(self): - if not getattr(Options.options, 'smp', getattr(self, 'smp', None)): - return - if Utils.unversioned_sys_platform() in ('freebsd',): - pid = os.getpid() - cmd = ['cpuset', '-l', '0', '-p', str(pid)] - elif Utils.unversioned_sys_platform() in ('linux',): - pid = os.getpid() - cmd = ['taskset', '-pc', '0', str(pid)] - if cmd: - self.cmd_and_log(cmd, quiet=0) - - def options(opt): - # memory consumption might be at the lowest point while processing options - opt.add_option('--pin-process', action='store_true', dest='smp', default=False) - if Utils.is_win32 or os.sep != '/': - return - while len(CONNS) < 30: - (pid, conn) = make_conn(opt) - SERVERS.append(pid) - CONNS.append(conn) - - def build(bld): - if Utils.is_win32 or os.sep != '/': - return - if bld.cmd == 'clean': - return - while len(CONNS) < bld.jobs: - (pid, conn) = make_conn(bld) - SERVERS.append(pid) - CONNS.append(conn) - init_smp(bld) - bld.__class__.exec_command_old = bld.__class__.exec_command - bld.__class__.exec_command = exec_command |