summaryrefslogtreecommitdiff
path: root/third_party/waf/wafadmin/Runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/waf/wafadmin/Runner.py')
-rw-r--r--third_party/waf/wafadmin/Runner.py235
1 files changed, 235 insertions, 0 deletions
diff --git a/third_party/waf/wafadmin/Runner.py b/third_party/waf/wafadmin/Runner.py
new file mode 100644
index 00000000000..0d2dea5ef47
--- /dev/null
+++ b/third_party/waf/wafadmin/Runner.py
@@ -0,0 +1,235 @@
+#!/usr/bin/env python
+# encoding: utf-8
+# Thomas Nagy, 2005-2008 (ita)
+
+"Execute the tasks"
+
+import os, sys, random, time, threading, traceback
+try: from Queue import Queue
+except ImportError: from queue import Queue
+import Build, Utils, Logs, Options
+from Logs import debug, error
+from Constants import *
+
+GAP = 15
+
+run_old = threading.Thread.run
+def run(*args, **kwargs):
+ try:
+ run_old(*args, **kwargs)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ sys.excepthook(*sys.exc_info())
+threading.Thread.run = run
+
+def process_task(tsk):
+
+ m = tsk.master
+ if m.stop:
+ m.out.put(tsk)
+ return
+
+ try:
+ tsk.generator.bld.printout(tsk.display())
+ if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
+ # actual call to task's run() function
+ else: ret = tsk.call_run()
+ except Exception, e:
+ tsk.err_msg = Utils.ex_stack()
+ tsk.hasrun = EXCEPTION
+
+ # TODO cleanup
+ m.error_handler(tsk)
+ m.out.put(tsk)
+ return
+
+ if ret:
+ tsk.err_code = ret
+ tsk.hasrun = CRASHED
+ else:
+ try:
+ tsk.post_run()
+ except Utils.WafError:
+ pass
+ except Exception:
+ tsk.err_msg = Utils.ex_stack()
+ tsk.hasrun = EXCEPTION
+ else:
+ tsk.hasrun = SUCCESS
+ if tsk.hasrun != SUCCESS:
+ m.error_handler(tsk)
+
+ m.out.put(tsk)
+
+class TaskConsumer(threading.Thread):
+ ready = Queue(0)
+ consumers = []
+
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self.setDaemon(1)
+ self.start()
+
+ def run(self):
+ try:
+ self.loop()
+ except:
+ pass
+
+ def loop(self):
+ while 1:
+ tsk = TaskConsumer.ready.get()
+ process_task(tsk)
+
+class Parallel(object):
+ """
+ keep the consumer threads busy, and avoid consuming cpu cycles
+ when no more tasks can be added (end of the build, etc)
+ """
+ def __init__(self, bld, j=2):
+
+ # number of consumers
+ self.numjobs = j
+
+ self.manager = bld.task_manager
+ self.manager.current_group = 0
+
+ self.total = self.manager.total()
+
+ # tasks waiting to be processed - IMPORTANT
+ self.outstanding = []
+ self.maxjobs = MAXJOBS
+
+ # tasks that are awaiting for another task to complete
+ self.frozen = []
+
+ # tasks returned by the consumers
+ self.out = Queue(0)
+
+ self.count = 0 # tasks not in the producer area
+
+ self.processed = 1 # progress indicator
+
+ self.stop = False # error condition to stop the build
+ self.error = False # error flag
+
+ def get_next(self):
+ "override this method to schedule the tasks in a particular order"
+ if not self.outstanding:
+ return None
+ return self.outstanding.pop(0)
+
+ def postpone(self, tsk):
+ "override this method to schedule the tasks in a particular order"
+ # TODO consider using a deque instead
+ if random.randint(0, 1):
+ self.frozen.insert(0, tsk)
+ else:
+ self.frozen.append(tsk)
+
+ def refill_task_list(self):
+ "called to set the next group of tasks"
+
+ while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
+ self.get_out()
+
+ while not self.outstanding:
+ if self.count:
+ self.get_out()
+
+ if self.frozen:
+ self.outstanding += self.frozen
+ self.frozen = []
+ elif not self.count:
+ (jobs, tmp) = self.manager.get_next_set()
+ if jobs != None: self.maxjobs = jobs
+ if tmp: self.outstanding += tmp
+ break
+
+ def get_out(self):
+ "the tasks that are put to execute are all collected using get_out"
+ ret = self.out.get()
+ self.manager.add_finished(ret)
+ if not self.stop and getattr(ret, 'more_tasks', None):
+ self.outstanding += ret.more_tasks
+ self.total += len(ret.more_tasks)
+ self.count -= 1
+
+ def error_handler(self, tsk):
+ "by default, errors make the build stop (not thread safe so be careful)"
+ if not Options.options.keep:
+ self.stop = True
+ self.error = True
+
+ def start(self):
+ "execute the tasks"
+
+ if TaskConsumer.consumers:
+ # the worker pool is usually loaded lazily (see below)
+ # in case it is re-used with a different value of numjobs:
+ while len(TaskConsumer.consumers) < self.numjobs:
+ TaskConsumer.consumers.append(TaskConsumer())
+
+ while not self.stop:
+
+ self.refill_task_list()
+
+ # consider the next task
+ tsk = self.get_next()
+ if not tsk:
+ if self.count:
+ # tasks may add new ones after they are run
+ continue
+ else:
+ # no tasks to run, no tasks running, time to exit
+ break
+
+ if tsk.hasrun:
+ # if the task is marked as "run", just skip it
+ self.processed += 1
+ self.manager.add_finished(tsk)
+ continue
+
+ try:
+ st = tsk.runnable_status()
+ except Exception, e:
+ self.processed += 1
+ if self.stop and not Options.options.keep:
+ tsk.hasrun = SKIPPED
+ self.manager.add_finished(tsk)
+ continue
+ self.error_handler(tsk)
+ self.manager.add_finished(tsk)
+ tsk.hasrun = EXCEPTION
+ tsk.err_msg = Utils.ex_stack()
+ continue
+
+ if st == ASK_LATER:
+ self.postpone(tsk)
+ elif st == SKIP_ME:
+ self.processed += 1
+ tsk.hasrun = SKIPPED
+ self.manager.add_finished(tsk)
+ else:
+ # run me: put the task in ready queue
+ tsk.position = (self.processed, self.total)
+ self.count += 1
+ tsk.master = self
+ self.processed += 1
+
+ if self.numjobs == 1:
+ process_task(tsk)
+ else:
+ TaskConsumer.ready.put(tsk)
+ # create the consumer threads only if there is something to consume
+ if not TaskConsumer.consumers:
+ TaskConsumer.consumers = [TaskConsumer() for i in xrange(self.numjobs)]
+
+ # self.count represents the tasks that have been made available to the consumer threads
+ # collect all the tasks after an error else the message may be incomplete
+ while self.error and self.count:
+ self.get_out()
+
+ #print loop
+ assert (self.count == 0 or self.stop)