summaryrefslogtreecommitdiff
path: root/third_party/waf/waflib/extras/preforkunix.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/waf/waflib/extras/preforkunix.py')
-rw-r--r--third_party/waf/waflib/extras/preforkunix.py317
1 files changed, 317 insertions, 0 deletions
diff --git a/third_party/waf/waflib/extras/preforkunix.py b/third_party/waf/waflib/extras/preforkunix.py
new file mode 100644
index 00000000000..ec9aeeb10e7
--- /dev/null
+++ b/third_party/waf/waflib/extras/preforkunix.py
@@ -0,0 +1,317 @@
+#! /usr/bin/env python
+# encoding: utf-8
+# Thomas Nagy, 2015 (ita)
+
+"""
+A version of prefork.py that uses unix sockets. The advantage is that it does not expose
+connections to the outside. Yet performance it only works on unix-like systems
+and performance can be slightly worse.
+
+To use::
+
+ def options(opt):
+ # recommended, fork new processes before using more memory
+ opt.load('preforkunix')
+
+ def build(bld):
+ bld.load('preforkunix')
+ ...
+ more code
+"""
+
+import os, re, socket, threading, sys, subprocess, atexit, traceback, signal, time
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+try:
+ import cPickle
+except ImportError:
+ import pickle as cPickle
+
+HEADER_SIZE = 20
+
+REQ = 'REQ'
+RES = 'RES'
+BYE = 'BYE'
+
+def make_header(params, cookie=''):
+ header = ','.join(params)
+ header = header.ljust(HEADER_SIZE - len(cookie))
+ assert(len(header) == HEADER_SIZE - len(cookie))
+ header = header + cookie
+ if sys.hexversion > 0x3000000:
+ header = header.encode('iso8859-1')
+ return header
+
+re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
+if 1:
+ def send_response(conn, ret, out, err, exc):
+ if out or err or exc:
+ data = (out, err, exc)
+ data = cPickle.dumps(data, -1)
+ else:
+ data = ''
+
+ params = [RES, str(ret), str(len(data))]
+
+ # no need for the cookie in the response
+ conn.send(make_header(params))
+ if data:
+ conn.send(data)
+
+ def process_command(conn):
+ query = conn.recv(HEADER_SIZE)
+ if not query:
+ return None
+ #print(len(query))
+ assert(len(query) == HEADER_SIZE)
+ if sys.hexversion > 0x3000000:
+ query = query.decode('iso8859-1')
+
+ #print "%r" % query
+ if not re_valid_query.match(query):
+ send_response(conn, -1, '', '', 'Invalid query %r' % query)
+ raise ValueError('Invalid query %r' % query)
+
+ query = query.strip().split(',')
+
+ if query[0] == REQ:
+ run_command(conn, query[1:])
+ elif query[0] == BYE:
+ raise ValueError('Exit')
+ else:
+ raise ValueError('Invalid query %r' % query)
+ return 'ok'
+
+ def run_command(conn, query):
+
+ size = int(query[0])
+ data = conn.recv(size)
+ assert(len(data) == size)
+ kw = cPickle.loads(data)
+
+ # run command
+ ret = out = err = exc = None
+ cmd = kw['cmd']
+ del kw['cmd']
+ #print(cmd)
+
+ try:
+ if kw['stdout'] or kw['stderr']:
+ p = subprocess.Popen(cmd, **kw)
+ (out, err) = p.communicate()
+ ret = p.returncode
+ else:
+ ret = subprocess.Popen(cmd, **kw).wait()
+ except KeyboardInterrupt:
+ raise
+ except Exception as e:
+ ret = -1
+ exc = str(e) + traceback.format_exc()
+
+ send_response(conn, ret, out, err, exc)
+
+if 1:
+
+ from waflib import Logs, Utils, Runner, Errors, Options
+
+ def init_task_pool(self):
+ # lazy creation, and set a common pool for all task consumers
+ pool = self.pool = []
+ for i in range(self.numjobs):
+ consumer = Runner.get_pool()
+ pool.append(consumer)
+ consumer.idx = i
+ self.ready = Queue(0)
+ def setq(consumer):
+ consumer.ready = self.ready
+ try:
+ threading.current_thread().idx = consumer.idx
+ except Exception as e:
+ print(e)
+ for x in pool:
+ x.ready.put(setq)
+ return pool
+ Runner.Parallel.init_task_pool = init_task_pool
+
+ def make_conn(bld):
+ child_socket, parent_socket = socket.socketpair(socket.AF_UNIX)
+ ppid = os.getpid()
+ pid = os.fork()
+ if pid == 0:
+ parent_socket.close()
+
+ # if the parent crashes, try to exit cleanly
+ def reap():
+ while 1:
+ try:
+ os.kill(ppid, 0)
+ except OSError:
+ break
+ else:
+ time.sleep(1)
+ os.kill(os.getpid(), signal.SIGKILL)
+ t = threading.Thread(target=reap)
+ t.setDaemon(True)
+ t.start()
+
+ # write to child_socket only
+ try:
+ while process_command(child_socket):
+ pass
+ except KeyboardInterrupt:
+ sys.exit(2)
+ else:
+ child_socket.close()
+ return (pid, parent_socket)
+
+ SERVERS = []
+ CONNS = []
+ def close_all():
+ global SERVERS, CONS
+ while CONNS:
+ conn = CONNS.pop()
+ try:
+ conn.close()
+ except:
+ pass
+ while SERVERS:
+ pid = SERVERS.pop()
+ try:
+ os.kill(pid, 9)
+ except:
+ pass
+ atexit.register(close_all)
+
+ def put_data(conn, data):
+ cnt = 0
+ while cnt < len(data):
+ sent = conn.send(data[cnt:])
+ if sent == 0:
+ raise RuntimeError('connection ended')
+ cnt += sent
+
+ def read_data(conn, siz):
+ cnt = 0
+ buf = []
+ while cnt < siz:
+ data = conn.recv(min(siz - cnt, 1024))
+ if not data:
+ raise RuntimeError('connection ended %r %r' % (cnt, siz))
+ buf.append(data)
+ cnt += len(data)
+ if sys.hexversion > 0x3000000:
+ ret = ''.encode('iso8859-1').join(buf)
+ else:
+ ret = ''.join(buf)
+ return ret
+
+ def exec_command(self, cmd, **kw):
+ if 'stdout' in kw:
+ if kw['stdout'] not in (None, subprocess.PIPE):
+ return self.exec_command_old(cmd, **kw)
+ elif 'stderr' in kw:
+ if kw['stderr'] not in (None, subprocess.PIPE):
+ return self.exec_command_old(cmd, **kw)
+
+ kw['shell'] = isinstance(cmd, str)
+ Logs.debug('runner: %r' % cmd)
+ Logs.debug('runner_env: kw=%s' % kw)
+
+ if self.logger:
+ self.logger.info(cmd)
+
+ if 'stdout' not in kw:
+ kw['stdout'] = subprocess.PIPE
+ if 'stderr' not in kw:
+ kw['stderr'] = subprocess.PIPE
+
+ if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
+ raise Errors.WafError("Program %s not found!" % cmd[0])
+
+ idx = threading.current_thread().idx
+ kw['cmd'] = cmd
+
+ # serialization..
+ #print("sub %r %r" % (idx, cmd))
+ #print("write to %r %r" % (idx, cmd))
+
+ data = cPickle.dumps(kw, -1)
+ params = [REQ, str(len(data))]
+ header = make_header(params)
+
+ conn = CONNS[idx]
+
+ put_data(conn, header + data)
+
+ #print("running %r %r" % (idx, cmd))
+ #print("read from %r %r" % (idx, cmd))
+
+ data = read_data(conn, HEADER_SIZE)
+ if sys.hexversion > 0x3000000:
+ data = data.decode('iso8859-1')
+
+ #print("received %r" % data)
+ lst = data.split(',')
+ ret = int(lst[1])
+ dlen = int(lst[2])
+
+ out = err = None
+ if dlen:
+ data = read_data(conn, dlen)
+ (out, err, exc) = cPickle.loads(data)
+ if exc:
+ raise Errors.WafError('Execution failure: %s' % exc)
+
+ if out:
+ if not isinstance(out, str):
+ out = out.decode(sys.stdout.encoding or 'iso8859-1')
+ if self.logger:
+ self.logger.debug('out: %s' % out)
+ else:
+ Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
+ if err:
+ if not isinstance(err, str):
+ err = err.decode(sys.stdout.encoding or 'iso8859-1')
+ if self.logger:
+ self.logger.error('err: %s' % err)
+ else:
+ Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
+
+ return ret
+
+ def init_smp(self):
+ if not getattr(Options.options, 'smp', getattr(self, 'smp', None)):
+ return
+ if Utils.unversioned_sys_platform() in ('freebsd',):
+ pid = os.getpid()
+ cmd = ['cpuset', '-l', '0', '-p', str(pid)]
+ elif Utils.unversioned_sys_platform() in ('linux',):
+ pid = os.getpid()
+ cmd = ['taskset', '-pc', '0', str(pid)]
+ if cmd:
+ self.cmd_and_log(cmd, quiet=0)
+
+ def options(opt):
+ # memory consumption might be at the lowest point while processing options
+ opt.add_option('--pin-process', action='store_true', dest='smp', default=False)
+ if Utils.is_win32 or os.sep != '/':
+ return
+ while len(CONNS) < 30:
+ (pid, conn) = make_conn(opt)
+ SERVERS.append(pid)
+ CONNS.append(conn)
+
+ def build(bld):
+ if Utils.is_win32 or os.sep != '/':
+ return
+ if bld.cmd == 'clean':
+ return
+ while len(CONNS) < bld.jobs:
+ (pid, conn) = make_conn(bld)
+ SERVERS.append(pid)
+ CONNS.append(conn)
+ init_smp(bld)
+ bld.__class__.exec_command_old = bld.__class__.exec_command
+ bld.__class__.exec_command = exec_command