diff options
Diffstat (limited to 'third_party/waf/waflib/Runner.py')
-rw-r--r-- | third_party/waf/waflib/Runner.py | 322 |
1 files changed, 264 insertions, 58 deletions
diff --git a/third_party/waf/waflib/Runner.py b/third_party/waf/waflib/Runner.py index 1e37401d32c..30e42d3b099 100644 --- a/third_party/waf/waflib/Runner.py +++ b/third_party/waf/waflib/Runner.py @@ -4,24 +4,50 @@ #!/usr/bin/env python # encoding: utf-8 -# Thomas Nagy, 2005-2016 (ita) +# Thomas Nagy, 2005-2018 (ita) """ Runner.py: Task scheduling and execution """ -import random +import heapq, traceback try: from queue import Queue except ImportError: from Queue import Queue from waflib import Utils, Task, Errors, Logs -GAP = 20 +GAP = 5 """ Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run """ +class PriorityTasks(object): + def __init__(self): + self.lst = [] + def __len__(self): + return len(self.lst) + def __iter__(self): + return iter(self.lst) + def clear(self): + self.lst = [] + def append(self, task): + heapq.heappush(self.lst, task) + def appendleft(self, task): + heapq.heappush(self.lst, task) + def pop(self): + return heapq.heappop(self.lst) + def extend(self, lst): + if self.lst: + for x in lst: + self.append(x) + else: + if isinstance(lst, list): + self.lst = lst + heapq.heapify(lst) + else: + self.lst = lst.lst + class Consumer(Utils.threading.Thread): """ Daemon thread object that executes a task. It shares a semaphore with @@ -42,7 +68,7 @@ class Consumer(Utils.threading.Thread): """ try: if not self.spawner.master.stop: - self.task.process() + self.spawner.master.process_task(self.task) finally: self.spawner.sem.release() self.spawner.master.out.put(self.task) @@ -53,7 +79,7 @@ class Spawner(Utils.threading.Thread): """ Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each - :py:class:`waflib.Task.TaskBase` instance. + :py:class:`waflib.Task.Task` instance. """ def __init__(self, master): Utils.threading.Thread.__init__(self) @@ -106,22 +132,25 @@ class Parallel(object): Instance of :py:class:`waflib.Build.BuildContext` """ - self.outstanding = Utils.deque() - """List of :py:class:`waflib.Task.TaskBase` that may be ready to be executed""" + self.outstanding = PriorityTasks() + """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed""" - self.frozen = Utils.deque() - """List of :py:class:`waflib.Task.TaskBase` that are not ready yet""" + self.postponed = PriorityTasks() + """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons""" + + self.incomplete = set() + """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)""" self.ready = Queue(0) - """List of :py:class:`waflib.Task.TaskBase` ready to be executed by consumers""" + """List of :py:class:`waflib.Task.Task` ready to be executed by consumers""" self.out = Queue(0) - """List of :py:class:`waflib.Task.TaskBase` returned by the task consumers""" + """List of :py:class:`waflib.Task.Task` returned by the task consumers""" self.count = 0 """Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`""" - self.processed = 1 + self.processed = 0 """Amount of tasks processed""" self.stop = False @@ -138,6 +167,11 @@ class Parallel(object): Flag that indicates that the build cache must be saved when a task was executed (calls :py:meth:`waflib.Build.BuildContext.store`)""" + self.revdeps = Utils.defaultdict(set) + """ + The reverse dependency graph of dependencies obtained from Task.run_after + """ + self.spawner = Spawner(self) """ Coordinating daemon thread that spawns thread consumers @@ -147,28 +181,26 @@ class Parallel(object): """ Obtains the next Task instance to run - :rtype: :py:class:`waflib.Task.TaskBase` + :rtype: :py:class:`waflib.Task.Task` """ if not self.outstanding: return None - return self.outstanding.popleft() + return self.outstanding.pop() def postpone(self, tsk): """ - Adds the task to the list :py:attr:`waflib.Runner.Parallel.frozen`. + Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`. The order is scrambled so as to consume as many tasks in parallel as possible. :param tsk: task instance - :type tsk: :py:class:`waflib.Task.TaskBase` + :type tsk: :py:class:`waflib.Task.Task` """ - if random.randint(0, 1): - self.frozen.appendleft(tsk) - else: - self.frozen.append(tsk) + self.postponed.append(tsk) def refill_task_list(self): """ - Adds the next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`. + Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`. + Ensures that all tasks in the current build group are complete before processing the next one. """ while self.count > self.numjobs * GAP: self.get_out() @@ -176,54 +208,105 @@ class Parallel(object): while not self.outstanding: if self.count: self.get_out() - elif self.frozen: + if self.outstanding: + break + elif self.postponed: try: cond = self.deadlock == self.processed except AttributeError: pass else: if cond: - msg = 'check the build order for the tasks' - for tsk in self.frozen: - if not tsk.run_after: - msg = 'check the methods runnable_status' - break lst = [] - for tsk in self.frozen: - lst.append('%s\t-> %r' % (repr(tsk), [id(x) for x in tsk.run_after])) - raise Errors.WafError('Deadlock detected: %s%s' % (msg, ''.join(lst))) + for tsk in self.postponed: + deps = [id(x) for x in tsk.run_after if not x.hasrun] + lst.append('%s\t-> %r' % (repr(tsk), deps)) + if not deps: + lst.append('\n task %r dependencies are done, check its *runnable_status*?' % id(tsk)) + raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst)) self.deadlock = self.processed - if self.frozen: - self.outstanding.extend(self.frozen) - self.frozen.clear() + if self.postponed: + self.outstanding.extend(self.postponed) + self.postponed.clear() elif not self.count: - self.outstanding.extend(self.biter.next()) - self.total = self.bld.total() - break + if self.incomplete: + for x in self.incomplete: + for k in x.run_after: + if not k.hasrun: + break + else: + # dependency added after the build started without updating revdeps + self.incomplete.remove(x) + self.outstanding.append(x) + break + else: + raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete) + else: + tasks = next(self.biter) + ready, waiting = self.prio_and_split(tasks) + self.outstanding.extend(ready) + self.incomplete.update(waiting) + self.total = self.bld.total() + break def add_more_tasks(self, tsk): """ - If a task provides :py:attr:`waflib.Task.TaskBase.more_tasks`, then the tasks contained + If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained in that list are added to the current build and will be processed before the next build group. + The priorities for dependent tasks are not re-calculated globally + :param tsk: task instance - :type tsk: :py:attr:`waflib.Task.TaskBase` + :type tsk: :py:attr:`waflib.Task.Task` """ if getattr(tsk, 'more_tasks', None): - self.outstanding.extend(tsk.more_tasks) + # TODO recompute priorities globally? + ready, waiting = self.prio_and_split(tsk.more_tasks) + self.outstanding.extend(ready) + self.incomplete.update(waiting) self.total += len(tsk.more_tasks) + def mark_finished(self, tsk): + def try_unfreeze(x): + # DAG ancestors are likely to be in the incomplete set + if x in self.incomplete: + # TODO remove dependencies to free some memory? + # x.run_after.remove(tsk) + for k in x.run_after: + if not k.hasrun: + break + else: + self.incomplete.remove(x) + self.outstanding.append(x) + + if tsk in self.revdeps: + for x in self.revdeps[tsk]: + if isinstance(x, Task.TaskGroup): + x.prev.remove(tsk) + if not x.prev: + for k in x.next: + # TODO necessary optimization? + k.run_after.remove(x) + try_unfreeze(k) + # TODO necessary optimization? + x.next = [] + else: + try_unfreeze(x) + del self.revdeps[tsk] + def get_out(self): """ Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution. Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`. - :rtype: :py:attr:`waflib.Task.TaskBase` + :rtype: :py:attr:`waflib.Task.Task` """ tsk = self.out.get() if not self.stop: self.add_more_tasks(tsk) + self.mark_finished(tsk) + self.count -= 1 self.dirty = True return tsk @@ -233,32 +316,42 @@ class Parallel(object): Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them. :param tsk: task instance - :type tsk: :py:attr:`waflib.Task.TaskBase` + :type tsk: :py:attr:`waflib.Task.Task` """ self.ready.put(tsk) + def process_task(self, tsk): + """ + Processes a task and attempts to stop the build in case of errors + """ + tsk.process() + if tsk.hasrun != Task.SUCCESS: + self.error_handler(tsk) + def skip(self, tsk): """ Mark a task as skipped/up-to-date """ tsk.hasrun = Task.SKIPPED + self.mark_finished(tsk) + + def cancel(self, tsk): + """ + Mark a task as failed because of unsatisfiable dependencies + """ + tsk.hasrun = Task.CANCELED + self.mark_finished(tsk) def error_handler(self, tsk): """ - Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set, unless - the build is executed with:: + Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set, + unless the build is executed with:: $ waf build -k :param tsk: task instance - :type tsk: :py:attr:`waflib.Task.TaskBase` + :type tsk: :py:attr:`waflib.Task.Task` """ - if hasattr(tsk, 'scan') and hasattr(tsk, 'uid'): - # TODO waf 2.0 - this breaks encapsulation - try: - del self.bld.imp_sigs[tsk.uid()] - except KeyError: - pass if not self.bld.keep: self.stop = True self.error.append(tsk) @@ -274,11 +367,11 @@ class Parallel(object): return tsk.runnable_status() except Exception: self.processed += 1 - tsk.err_msg = Utils.ex_stack() + tsk.err_msg = traceback.format_exc() if not self.stop and self.bld.keep: self.skip(tsk) if self.bld.keep == 1: - # if -k stop at the first exception, if -kk try to go as far as possible + # if -k stop on the first exception, if -kk try to go as far as possible if Logs.verbose > 1 or not self.error: self.error.append(tsk) self.stop = True @@ -286,9 +379,10 @@ class Parallel(object): if Logs.verbose > 1: self.error.append(tsk) return Task.EXCEPTION - tsk.hasrun = Task.EXCEPTION + tsk.hasrun = Task.EXCEPTION self.error_handler(tsk) + return Task.EXCEPTION def start(self): @@ -320,10 +414,9 @@ class Parallel(object): self.processed += 1 continue - if self.stop: # stop immediately after a failure was detected + if self.stop: # stop immediately after a failure is detected break - st = self.task_status(tsk) if st == Task.RUN_ME: self.count += 1 @@ -332,17 +425,24 @@ class Parallel(object): if self.numjobs == 1: tsk.log_display(tsk.generator.bld) try: - tsk.process() + self.process_task(tsk) finally: self.out.put(tsk) else: self.add_task(tsk) - if st == Task.ASK_LATER: + elif st == Task.ASK_LATER: self.postpone(tsk) elif st == Task.SKIP_ME: self.processed += 1 self.skip(tsk) self.add_more_tasks(tsk) + elif st == Task.CANCEL_ME: + # A dependency problem has occurred, and the + # build is most likely run with `waf -k` + if Logs.verbose > 1: + self.error.append(tsk) + self.processed += 1 + self.cancel(tsk) # self.count represents the tasks that have been made available to the consumer threads # collect all the tasks after an error else the message may be incomplete @@ -350,4 +450,110 @@ class Parallel(object): self.get_out() self.ready.put(None) - assert (self.count == 0 or self.stop) + if not self.stop: + assert not self.count + assert not self.postponed + assert not self.incomplete + + def prio_and_split(self, tasks): + """ + Label input tasks with priority values, and return a pair containing + the tasks that are ready to run and the tasks that are necessarily + waiting for other tasks to complete. + + The priority system is really meant as an optional layer for optimization: + dependency cycles are found quickly, and builds should be more efficient. + A high priority number means that a task is processed first. + + This method can be overridden to disable the priority system:: + + def prio_and_split(self, tasks): + return tasks, [] + + :return: A pair of task lists + :rtype: tuple + """ + # to disable: + #return tasks, [] + for x in tasks: + x.visited = 0 + + reverse = self.revdeps + + for x in tasks: + for k in x.run_after: + if isinstance(k, Task.TaskGroup): + if k.done: + pass + else: + k.done = True + for j in k.prev: + reverse[j].add(k) + else: + reverse[k].add(x) + + # the priority number is not the tree depth + def visit(n): + if isinstance(n, Task.TaskGroup): + return sum(visit(k) for k in n.next) + + if n.visited == 0: + n.visited = 1 + + if n in reverse: + rev = reverse[n] + n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev) + else: + n.prio_order = n.tree_weight + + n.visited = 2 + elif n.visited == 1: + raise Errors.WafError('Dependency cycle found!') + return n.prio_order + + for x in tasks: + if x.visited != 0: + # must visit all to detect cycles + continue + try: + visit(x) + except Errors.WafError: + self.debug_cycles(tasks, reverse) + + ready = [] + waiting = [] + for x in tasks: + for k in x.run_after: + if not k.hasrun: + waiting.append(x) + break + else: + ready.append(x) + return (ready, waiting) + + def debug_cycles(self, tasks, reverse): + tmp = {} + for x in tasks: + tmp[x] = 0 + + def visit(n, acc): + if isinstance(n, Task.TaskGroup): + for k in n.next: + visit(k, acc) + return + if tmp[n] == 0: + tmp[n] = 1 + for k in reverse.get(n, []): + visit(k, [n] + acc) + tmp[n] = 2 + elif tmp[n] == 1: + lst = [] + for tsk in acc: + lst.append(repr(tsk)) + if tsk is n: + # exclude prior nodes, we want the minimum cycle + break + raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst)) + for x in tasks: + visit(x, []) + |