summaryrefslogtreecommitdiff
path: root/third_party/waf/waflib/extras/preforkunix.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/waf/waflib/extras/preforkunix.py')
-rw-r--r--third_party/waf/waflib/extras/preforkunix.py317
1 files changed, 0 insertions, 317 deletions
diff --git a/third_party/waf/waflib/extras/preforkunix.py b/third_party/waf/waflib/extras/preforkunix.py
deleted file mode 100644
index ec9aeeb10e7..00000000000
--- a/third_party/waf/waflib/extras/preforkunix.py
+++ /dev/null
@@ -1,317 +0,0 @@
-#! /usr/bin/env python
-# encoding: utf-8
-# Thomas Nagy, 2015 (ita)
-
-"""
-A version of prefork.py that uses unix sockets. The advantage is that it does not expose
-connections to the outside. Yet performance it only works on unix-like systems
-and performance can be slightly worse.
-
-To use::
-
- def options(opt):
- # recommended, fork new processes before using more memory
- opt.load('preforkunix')
-
- def build(bld):
- bld.load('preforkunix')
- ...
- more code
-"""
-
-import os, re, socket, threading, sys, subprocess, atexit, traceback, signal, time
-try:
- from queue import Queue
-except ImportError:
- from Queue import Queue
-try:
- import cPickle
-except ImportError:
- import pickle as cPickle
-
-HEADER_SIZE = 20
-
-REQ = 'REQ'
-RES = 'RES'
-BYE = 'BYE'
-
-def make_header(params, cookie=''):
- header = ','.join(params)
- header = header.ljust(HEADER_SIZE - len(cookie))
- assert(len(header) == HEADER_SIZE - len(cookie))
- header = header + cookie
- if sys.hexversion > 0x3000000:
- header = header.encode('iso8859-1')
- return header
-
-re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
-if 1:
- def send_response(conn, ret, out, err, exc):
- if out or err or exc:
- data = (out, err, exc)
- data = cPickle.dumps(data, -1)
- else:
- data = ''
-
- params = [RES, str(ret), str(len(data))]
-
- # no need for the cookie in the response
- conn.send(make_header(params))
- if data:
- conn.send(data)
-
- def process_command(conn):
- query = conn.recv(HEADER_SIZE)
- if not query:
- return None
- #print(len(query))
- assert(len(query) == HEADER_SIZE)
- if sys.hexversion > 0x3000000:
- query = query.decode('iso8859-1')
-
- #print "%r" % query
- if not re_valid_query.match(query):
- send_response(conn, -1, '', '', 'Invalid query %r' % query)
- raise ValueError('Invalid query %r' % query)
-
- query = query.strip().split(',')
-
- if query[0] == REQ:
- run_command(conn, query[1:])
- elif query[0] == BYE:
- raise ValueError('Exit')
- else:
- raise ValueError('Invalid query %r' % query)
- return 'ok'
-
- def run_command(conn, query):
-
- size = int(query[0])
- data = conn.recv(size)
- assert(len(data) == size)
- kw = cPickle.loads(data)
-
- # run command
- ret = out = err = exc = None
- cmd = kw['cmd']
- del kw['cmd']
- #print(cmd)
-
- try:
- if kw['stdout'] or kw['stderr']:
- p = subprocess.Popen(cmd, **kw)
- (out, err) = p.communicate()
- ret = p.returncode
- else:
- ret = subprocess.Popen(cmd, **kw).wait()
- except KeyboardInterrupt:
- raise
- except Exception as e:
- ret = -1
- exc = str(e) + traceback.format_exc()
-
- send_response(conn, ret, out, err, exc)
-
-if 1:
-
- from waflib import Logs, Utils, Runner, Errors, Options
-
- def init_task_pool(self):
- # lazy creation, and set a common pool for all task consumers
- pool = self.pool = []
- for i in range(self.numjobs):
- consumer = Runner.get_pool()
- pool.append(consumer)
- consumer.idx = i
- self.ready = Queue(0)
- def setq(consumer):
- consumer.ready = self.ready
- try:
- threading.current_thread().idx = consumer.idx
- except Exception as e:
- print(e)
- for x in pool:
- x.ready.put(setq)
- return pool
- Runner.Parallel.init_task_pool = init_task_pool
-
- def make_conn(bld):
- child_socket, parent_socket = socket.socketpair(socket.AF_UNIX)
- ppid = os.getpid()
- pid = os.fork()
- if pid == 0:
- parent_socket.close()
-
- # if the parent crashes, try to exit cleanly
- def reap():
- while 1:
- try:
- os.kill(ppid, 0)
- except OSError:
- break
- else:
- time.sleep(1)
- os.kill(os.getpid(), signal.SIGKILL)
- t = threading.Thread(target=reap)
- t.setDaemon(True)
- t.start()
-
- # write to child_socket only
- try:
- while process_command(child_socket):
- pass
- except KeyboardInterrupt:
- sys.exit(2)
- else:
- child_socket.close()
- return (pid, parent_socket)
-
- SERVERS = []
- CONNS = []
- def close_all():
- global SERVERS, CONS
- while CONNS:
- conn = CONNS.pop()
- try:
- conn.close()
- except:
- pass
- while SERVERS:
- pid = SERVERS.pop()
- try:
- os.kill(pid, 9)
- except:
- pass
- atexit.register(close_all)
-
- def put_data(conn, data):
- cnt = 0
- while cnt < len(data):
- sent = conn.send(data[cnt:])
- if sent == 0:
- raise RuntimeError('connection ended')
- cnt += sent
-
- def read_data(conn, siz):
- cnt = 0
- buf = []
- while cnt < siz:
- data = conn.recv(min(siz - cnt, 1024))
- if not data:
- raise RuntimeError('connection ended %r %r' % (cnt, siz))
- buf.append(data)
- cnt += len(data)
- if sys.hexversion > 0x3000000:
- ret = ''.encode('iso8859-1').join(buf)
- else:
- ret = ''.join(buf)
- return ret
-
- def exec_command(self, cmd, **kw):
- if 'stdout' in kw:
- if kw['stdout'] not in (None, subprocess.PIPE):
- return self.exec_command_old(cmd, **kw)
- elif 'stderr' in kw:
- if kw['stderr'] not in (None, subprocess.PIPE):
- return self.exec_command_old(cmd, **kw)
-
- kw['shell'] = isinstance(cmd, str)
- Logs.debug('runner: %r' % cmd)
- Logs.debug('runner_env: kw=%s' % kw)
-
- if self.logger:
- self.logger.info(cmd)
-
- if 'stdout' not in kw:
- kw['stdout'] = subprocess.PIPE
- if 'stderr' not in kw:
- kw['stderr'] = subprocess.PIPE
-
- if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
- raise Errors.WafError("Program %s not found!" % cmd[0])
-
- idx = threading.current_thread().idx
- kw['cmd'] = cmd
-
- # serialization..
- #print("sub %r %r" % (idx, cmd))
- #print("write to %r %r" % (idx, cmd))
-
- data = cPickle.dumps(kw, -1)
- params = [REQ, str(len(data))]
- header = make_header(params)
-
- conn = CONNS[idx]
-
- put_data(conn, header + data)
-
- #print("running %r %r" % (idx, cmd))
- #print("read from %r %r" % (idx, cmd))
-
- data = read_data(conn, HEADER_SIZE)
- if sys.hexversion > 0x3000000:
- data = data.decode('iso8859-1')
-
- #print("received %r" % data)
- lst = data.split(',')
- ret = int(lst[1])
- dlen = int(lst[2])
-
- out = err = None
- if dlen:
- data = read_data(conn, dlen)
- (out, err, exc) = cPickle.loads(data)
- if exc:
- raise Errors.WafError('Execution failure: %s' % exc)
-
- if out:
- if not isinstance(out, str):
- out = out.decode(sys.stdout.encoding or 'iso8859-1')
- if self.logger:
- self.logger.debug('out: %s' % out)
- else:
- Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
- if err:
- if not isinstance(err, str):
- err = err.decode(sys.stdout.encoding or 'iso8859-1')
- if self.logger:
- self.logger.error('err: %s' % err)
- else:
- Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
-
- return ret
-
- def init_smp(self):
- if not getattr(Options.options, 'smp', getattr(self, 'smp', None)):
- return
- if Utils.unversioned_sys_platform() in ('freebsd',):
- pid = os.getpid()
- cmd = ['cpuset', '-l', '0', '-p', str(pid)]
- elif Utils.unversioned_sys_platform() in ('linux',):
- pid = os.getpid()
- cmd = ['taskset', '-pc', '0', str(pid)]
- if cmd:
- self.cmd_and_log(cmd, quiet=0)
-
- def options(opt):
- # memory consumption might be at the lowest point while processing options
- opt.add_option('--pin-process', action='store_true', dest='smp', default=False)
- if Utils.is_win32 or os.sep != '/':
- return
- while len(CONNS) < 30:
- (pid, conn) = make_conn(opt)
- SERVERS.append(pid)
- CONNS.append(conn)
-
- def build(bld):
- if Utils.is_win32 or os.sep != '/':
- return
- if bld.cmd == 'clean':
- return
- while len(CONNS) < bld.jobs:
- (pid, conn) = make_conn(bld)
- SERVERS.append(pid)
- CONNS.append(conn)
- init_smp(bld)
- bld.__class__.exec_command_old = bld.__class__.exec_command
- bld.__class__.exec_command = exec_command