summaryrefslogtreecommitdiff
path: root/third_party/waf/waflib/extras/preforkjava.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/waf/waflib/extras/preforkjava.py')
-rw-r--r--third_party/waf/waflib/extras/preforkjava.py236
1 files changed, 236 insertions, 0 deletions
diff --git a/third_party/waf/waflib/extras/preforkjava.py b/third_party/waf/waflib/extras/preforkjava.py
new file mode 100644
index 00000000000..e93461b4da9
--- /dev/null
+++ b/third_party/waf/waflib/extras/preforkjava.py
@@ -0,0 +1,236 @@
+#! /usr/bin/env python
+# encoding: utf-8
+# Thomas Nagy, 2015 (ita)
+
+# TODO: have the child process terminate if the parent is killed abruptly
+
+import os, socket, threading, sys, subprocess, time, atexit, random
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+
+import json as pickle
+
+SHARED_KEY = None
+HEADER_SIZE = 64
+
+REQ = 'REQ'
+RES = 'RES'
+BYE = 'BYE'
+
+def make_header(params, cookie=''):
+ header = ','.join(params)
+ header = header.ljust(HEADER_SIZE - len(cookie))
+ assert(len(header) == HEADER_SIZE - len(cookie))
+ header = header + cookie
+ if sys.hexversion > 0x3000000:
+ header = header.encode('iso8859-1')
+ return header
+
+if 1:
+ from waflib import Logs, Utils, Runner, Errors, Options
+
+ def init_task_pool(self):
+ # lazy creation, and set a common pool for all task consumers
+ pool = self.pool = []
+ for i in range(self.numjobs):
+ consumer = Runner.get_pool()
+ pool.append(consumer)
+ consumer.idx = i
+ self.ready = Queue(0)
+ def setq(consumer):
+ consumer.ready = self.ready
+ try:
+ threading.current_thread().idx = consumer.idx
+ except Exception as e:
+ print(e)
+ for x in pool:
+ x.ready.put(setq)
+ return pool
+ Runner.Parallel.init_task_pool = init_task_pool
+
+ def make_server(bld, idx):
+ top = getattr(bld, 'preforkjava_top', os.path.dirname(os.path.abspath('__file__')))
+ cp = getattr(bld, 'preforkjava_cp', os.path.join(top, 'minimal-json-0.9.3-SNAPSHOT.jar') + os.pathsep + top)
+
+ for x in cp.split(os.pathsep):
+ if x and not os.path.exists(x):
+ Logs.warn('Invalid classpath: %r' % cp)
+ Logs.warn('Set for example bld.preforkjava_cp to /path/to/minimal-json:/path/to/Prefork.class/')
+
+ cwd = getattr(bld, 'preforkjava_cwd', top)
+ port = getattr(bld, 'preforkjava_port', 51200)
+ cmd = getattr(bld, 'preforkjava_cmd', 'java -cp %s%s Prefork %d' % (cp, os.pathsep, port))
+ proc = subprocess.Popen(cmd.split(), shell=False, cwd=cwd)
+ proc.port = port
+ return proc
+
+ def make_conn(bld, srv):
+ #port = PORT + idx
+ port = srv.port
+ conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ conn.connect(('127.0.0.1', port))
+ return conn
+
+ SERVERS = []
+ CONNS = []
+ def close_all():
+ global SERVERS
+ while SERVERS:
+ srv = SERVERS.pop()
+ #pid = srv.pid
+ try:
+ srv.kill()
+ except Exception:
+ pass
+ atexit.register(close_all)
+
+ def put_data(conn, data):
+ cnt = 0
+ while cnt < len(data):
+ sent = conn.send(data[cnt:])
+ if sent == 0:
+ raise RuntimeError('connection ended')
+ cnt += sent
+
+ def read_data(conn, siz):
+ cnt = 0
+ buf = []
+ while cnt < siz:
+ data = conn.recv(min(siz - cnt, 1024))
+ if not data:
+ raise RuntimeError('connection ended %r %r' % (cnt, siz))
+ buf.append(data)
+ cnt += len(data)
+ if sys.hexversion > 0x3000000:
+ ret = ''.encode('iso8859-1').join(buf)
+ else:
+ ret = ''.join(buf)
+ return ret
+
+ def exec_command(self, cmd, **kw):
+ if 'stdout' in kw:
+ if kw['stdout'] not in (None, subprocess.PIPE):
+ return self.exec_command_old(cmd, **kw)
+ elif 'stderr' in kw:
+ if kw['stderr'] not in (None, subprocess.PIPE):
+ return self.exec_command_old(cmd, **kw)
+
+ kw['shell'] = isinstance(cmd, str)
+ Logs.debug('runner: %r' % cmd)
+ Logs.debug('runner_env: kw=%s' % kw)
+
+ if self.logger:
+ self.logger.info(cmd)
+
+ if 'stdout' not in kw:
+ kw['stdout'] = subprocess.PIPE
+ if 'stderr' not in kw:
+ kw['stderr'] = subprocess.PIPE
+
+ if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
+ raise Errors.WafError("Program %s not found!" % cmd[0])
+
+ idx = threading.current_thread().idx
+ kw['cmd'] = cmd
+
+ data = pickle.dumps(kw)
+ params = [REQ, str(len(data))]
+ header = make_header(params, self.SHARED_KEY)
+
+ conn = CONNS[idx]
+
+ if sys.hexversion > 0x3000000:
+ data = data.encode('iso8859-1')
+ put_data(conn, header + data)
+
+ data = read_data(conn, HEADER_SIZE)
+ if sys.hexversion > 0x3000000:
+ data = data.decode('iso8859-1')
+
+ #print("received %r" % data)
+ lst = data.split(',')
+ ret = int(lst[1])
+ dlen = int(lst[2])
+
+ out = err = None
+ if dlen:
+ data = read_data(conn, dlen)
+ (out, err, exc) = pickle.loads(data)
+ if exc:
+ raise Errors.WafError('Execution failure: %s' % exc)
+
+ if out:
+ if not isinstance(out, str):
+ out = out.decode(sys.stdout.encoding or 'iso8859-1')
+ if self.logger:
+ self.logger.debug('out: %s' % out)
+ else:
+ Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
+ if err:
+ if not isinstance(err, str):
+ err = err.decode(sys.stdout.encoding or 'iso8859-1')
+ if self.logger:
+ self.logger.error('err: %s' % err)
+ else:
+ Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
+
+ return ret
+
+ def init_key(ctx):
+ try:
+ key = ctx.SHARED_KEY = os.environ['SHARED_KEY']
+ except KeyError:
+ key = "".join([chr(random.SystemRandom().randint(40, 126)) for x in range(20)])
+ os.environ['SHARED_KEY'] = ctx.SHARED_KEY = key
+ os.environ['PREFORKPID'] = str(os.getpid())
+ return key
+
+ def init_servers(ctx, maxval):
+ while len(SERVERS) < 1:
+ i = len(SERVERS)
+ srv = make_server(ctx, i)
+ SERVERS.append(srv)
+ while len(CONNS) < maxval:
+ i = len(CONNS)
+ srv = SERVERS[0]
+ conn = None
+ for x in range(30):
+ try:
+ conn = make_conn(ctx, srv)
+ break
+ except socket.error:
+ time.sleep(0.01)
+ if not conn:
+ raise ValueError('Could not start the server!')
+ CONNS.append(conn)
+
+ def init_smp(self):
+ if not getattr(Options.options, 'smp', getattr(self, 'smp', None)):
+ return
+ if Utils.unversioned_sys_platform() in ('freebsd',):
+ pid = os.getpid()
+ cmd = ['cpuset', '-l', '0', '-p', str(pid)]
+ elif Utils.unversioned_sys_platform() in ('linux',):
+ pid = os.getpid()
+ cmd = ['taskset', '-pc', '0', str(pid)]
+ if cmd:
+ self.cmd_and_log(cmd, quiet=0)
+
+ def options(opt):
+ opt.add_option('--pin-process', action='store_true', dest='smp', default=False)
+ init_key(opt)
+ init_servers(opt, 40)
+
+ def build(bld):
+ if bld.cmd == 'clean':
+ return
+
+ init_key(bld)
+ init_servers(bld, bld.jobs)
+ init_smp(bld)
+
+ bld.__class__.exec_command_old = bld.__class__.exec_command
+ bld.__class__.exec_command = exec_command