diff options
Diffstat (limited to 'waflib/Runner.py')
-rw-r--r-- | waflib/Runner.py | 576 |
1 files changed, 169 insertions, 407 deletions
diff --git a/waflib/Runner.py b/waflib/Runner.py index 261084d2..b3087292 100644 --- a/waflib/Runner.py +++ b/waflib/Runner.py @@ -1,125 +1,98 @@ #!/usr/bin/env python # encoding: utf-8 -# Thomas Nagy, 2005-2018 (ita) +# Thomas Nagy, 2005-2010 (ita) """ Runner.py: Task scheduling and execution + """ -import heapq, traceback +import random, atexit try: - from queue import Queue, PriorityQueue + from queue import Queue except ImportError: from Queue import Queue - try: - from Queue import PriorityQueue - except ImportError: - class PriorityQueue(Queue): - def _init(self, maxsize): - self.maxsize = maxsize - self.queue = [] - def _put(self, item): - heapq.heappush(self.queue, item) - def _get(self): - return heapq.heappop(self.queue) - from waflib import Utils, Task, Errors, Logs -GAP = 5 +GAP = 10 """ -Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run +Wait for free tasks if there are at least ``GAP * njobs`` in queue """ -class PriorityTasks(object): - def __init__(self): - self.lst = [] - def __len__(self): - return len(self.lst) - def __iter__(self): - return iter(self.lst) - def clear(self): - self.lst = [] - def append(self, task): - heapq.heappush(self.lst, task) - def appendleft(self, task): - "Deprecated, do not use" - heapq.heappush(self.lst, task) - def pop(self): - return heapq.heappop(self.lst) - def extend(self, lst): - if self.lst: - for x in lst: - self.append(x) - else: - if isinstance(lst, list): - self.lst = lst - heapq.heapify(lst) - else: - self.lst = lst.lst - -class Consumer(Utils.threading.Thread): +class TaskConsumer(Utils.threading.Thread): """ - Daemon thread object that executes a task. It shares a semaphore with - the coordinator :py:class:`waflib.Runner.Spawner`. There is one - instance per task to consume. + Task consumers belong to a pool of workers + + They wait for tasks in the queue and then use ``task.process(...)`` """ - def __init__(self, spawner, task): + def __init__(self): Utils.threading.Thread.__init__(self) - self.task = task - """Task to execute""" - self.spawner = spawner - """Coordinator object""" - self.setDaemon(1) - self.start() - def run(self): + self.ready = Queue() """ - Processes a single task + Obtain :py:class:`waflib.Task.TaskBase` instances from this queue. """ - try: - if not self.spawner.master.stop: - self.spawner.master.process_task(self.task) - finally: - self.spawner.sem.release() - self.spawner.master.out.put(self.task) - self.task = None - self.spawner = None - -class Spawner(Utils.threading.Thread): - """ - Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and - spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each - :py:class:`waflib.Task.Task` instance. - """ - def __init__(self, master): - Utils.threading.Thread.__init__(self) - self.master = master - """:py:class:`waflib.Runner.Parallel` producer instance""" - self.sem = Utils.threading.Semaphore(master.numjobs) - """Bounded semaphore that prevents spawning more than *n* concurrent consumers""" self.setDaemon(1) self.start() + def run(self): """ - Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop` + Loop over the tasks to execute """ try: self.loop() except Exception: - # Python 2 prints unnecessary messages when shutting down - # we also want to stop the thread properly pass + def loop(self): """ - Consumes task objects from the producer; ends when the producer has no more - task to provide. + Obtain tasks from :py:attr:`waflib.Runner.TaskConsumer.ready` and call + :py:meth:`waflib.Task.TaskBase.process`. If the object is a function, execute it. """ - master = self.master while 1: - task = master.ready.get() - self.sem.acquire() - if not master.stop: - task.log_display(task.generator.bld) - Consumer(self, task) + tsk = self.ready.get() + if not isinstance(tsk, Task.TaskBase): + tsk(self) + else: + tsk.process() + +pool = Queue() +""" +Pool of task consumer objects +""" + +def get_pool(): + """ + Obtain a task consumer from :py:attr:`waflib.Runner.pool`. + Do not forget to put it back by using :py:func:`waflib.Runner.put_pool` + and reset properly (original waiting queue). + + :rtype: :py:class:`waflib.Runner.TaskConsumer` + """ + try: + return pool.get(False) + except Exception: + return TaskConsumer() + +def put_pool(x): + """ + Return a task consumer to the thread pool :py:attr:`waflib.Runner.pool` + + :param x: task consumer object + :type x: :py:class:`waflib.Runner.TaskConsumer` + """ + pool.put(x) + +def _free_resources(): + global pool + lst = [] + while pool.qsize(): + lst.append(pool.get()) + for x in lst: + x.ready.put(None) + for x in lst: + x.join() + pool = None +atexit.register(_free_resources) class Parallel(object): """ @@ -133,7 +106,7 @@ class Parallel(object): self.numjobs = j """ - Amount of parallel consumers to use + Number of consumers in the pool """ self.bld = bld @@ -141,25 +114,19 @@ class Parallel(object): Instance of :py:class:`waflib.Build.BuildContext` """ - self.outstanding = PriorityTasks() - """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed""" - - self.postponed = PriorityTasks() - """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons""" + self.outstanding = [] + """List of :py:class:`waflib.Task.TaskBase` that may be ready to be executed""" - self.incomplete = set() - """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)""" - - self.ready = PriorityQueue(0) - """List of :py:class:`waflib.Task.Task` ready to be executed by consumers""" + self.frozen = [] + """List of :py:class:`waflib.Task.TaskBase` that cannot be executed immediately""" self.out = Queue(0) - """List of :py:class:`waflib.Task.Task` returned by the task consumers""" + """List of :py:class:`waflib.Task.TaskBase` returned by the task consumers""" self.count = 0 """Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`""" - self.processed = 0 + self.processed = 1 """Amount of tasks processed""" self.stop = False @@ -172,44 +139,33 @@ class Parallel(object): """Task iterator which must give groups of parallelizable tasks when calling ``next()``""" self.dirty = False - """ - Flag that indicates that the build cache must be saved when a task was executed - (calls :py:meth:`waflib.Build.BuildContext.store`)""" - - self.revdeps = Utils.defaultdict(set) - """ - The reverse dependency graph of dependencies obtained from Task.run_after - """ - - self.spawner = Spawner(self) - """ - Coordinating daemon thread that spawns thread consumers - """ + """Flag to indicate that tasks have been executed, and that the build cache must be saved (call :py:meth:`waflib.Build.BuildContext.store`)""" def get_next_task(self): """ - Obtains the next Task instance to run + Obtain the next task to execute. - :rtype: :py:class:`waflib.Task.Task` + :rtype: :py:class:`waflib.Task.TaskBase` """ if not self.outstanding: return None - return self.outstanding.pop() + return self.outstanding.pop(0) def postpone(self, tsk): """ - Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`. - The order is scrambled so as to consume as many tasks in parallel as possible. + A task cannot be executed at this point, put it in the list :py:attr:`waflib.Runner.Parallel.frozen`. - :param tsk: task instance - :type tsk: :py:class:`waflib.Task.Task` + :param tsk: task + :type tsk: :py:class:`waflib.Task.TaskBase` """ - self.postponed.append(tsk) + if random.randint(0, 1): + self.frozen.insert(0, tsk) + else: + self.frozen.append(tsk) def refill_task_list(self): """ - Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`. - Ensures that all tasks in the current build group are complete before processing the next one. + Put the next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`. """ while self.count > self.numjobs * GAP: self.get_out() @@ -217,224 +173,132 @@ class Parallel(object): while not self.outstanding: if self.count: self.get_out() - if self.outstanding: - break - elif self.postponed: + elif self.frozen: try: cond = self.deadlock == self.processed except AttributeError: pass else: if cond: - # The most common reason is conflicting build order declaration - # for example: "X run_after Y" and "Y run_after X" - # Another can be changing "run_after" dependencies while the build is running - # for example: updating "tsk.run_after" in the "runnable_status" method + msg = 'check the build order for the tasks' + for tsk in self.frozen: + if not tsk.run_after: + msg = 'check the methods runnable_status' + break lst = [] - for tsk in self.postponed: - deps = [id(x) for x in tsk.run_after if not x.hasrun] - lst.append('%s\t-> %r' % (repr(tsk), deps)) - if not deps: - lst.append('\n task %r dependencies are done, check its *runnable_status*?' % id(tsk)) - raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst)) + for tsk in self.frozen: + lst.append('%s\t-> %r' % (repr(tsk), [id(x) for x in tsk.run_after])) + raise Errors.WafError('Deadlock detected: %s%s' % (msg, ''.join(lst))) self.deadlock = self.processed - if self.postponed: - self.outstanding.extend(self.postponed) - self.postponed.clear() + if self.frozen: + self.outstanding += self.frozen + self.frozen = [] elif not self.count: - if self.incomplete: - for x in self.incomplete: - for k in x.run_after: - if not k.hasrun: - break - else: - # dependency added after the build started without updating revdeps - self.incomplete.remove(x) - self.outstanding.append(x) - break - else: - raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete) - else: - tasks = next(self.biter) - ready, waiting = self.prio_and_split(tasks) - self.outstanding.extend(ready) - self.incomplete.update(waiting) - self.total = self.bld.total() - break + self.outstanding.extend(next(self.biter)) + self.total = self.bld.total() + break def add_more_tasks(self, tsk): """ - If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained - in that list are added to the current build and will be processed before the next build group. + Tasks may be added dynamically during the build by binding them to the task :py:attr:`waflib.Task.TaskBase.more_tasks` - The priorities for dependent tasks are not re-calculated globally - - :param tsk: task instance - :type tsk: :py:attr:`waflib.Task.Task` + :param tsk: task + :type tsk: :py:attr:`waflib.Task.TaskBase` """ if getattr(tsk, 'more_tasks', None): - more = set(tsk.more_tasks) - groups_done = set() - def iteri(a, b): - for x in a: - yield x - for x in b: - yield x - - # Update the dependency tree - # this assumes that task.run_after values were updated - for x in iteri(self.outstanding, self.incomplete): - for k in x.run_after: - if isinstance(k, Task.TaskGroup): - if k not in groups_done: - groups_done.add(k) - for j in k.prev & more: - self.revdeps[j].add(k) - elif k in more: - self.revdeps[k].add(x) - - ready, waiting = self.prio_and_split(tsk.more_tasks) - self.outstanding.extend(ready) - self.incomplete.update(waiting) + self.outstanding += tsk.more_tasks self.total += len(tsk.more_tasks) - def mark_finished(self, tsk): - def try_unfreeze(x): - # DAG ancestors are likely to be in the incomplete set - # This assumes that the run_after contents have not changed - # after the build starts, else a deadlock may occur - if x in self.incomplete: - # TODO remove dependencies to free some memory? - # x.run_after.remove(tsk) - for k in x.run_after: - if not k.hasrun: - break - else: - self.incomplete.remove(x) - self.outstanding.append(x) - - if tsk in self.revdeps: - for x in self.revdeps[tsk]: - if isinstance(x, Task.TaskGroup): - x.prev.remove(tsk) - if not x.prev: - for k in x.next: - # TODO necessary optimization? - k.run_after.remove(x) - try_unfreeze(k) - # TODO necessary optimization? - x.next = [] - else: - try_unfreeze(x) - del self.revdeps[tsk] - - if hasattr(tsk, 'semaphore'): - sem = tsk.semaphore - sem.release(tsk) - while sem.waiting and not sem.is_locked(): - # take a frozen task, make it ready to run - x = sem.waiting.pop() - self._add_task(x) - def get_out(self): """ - Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution. - Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`. + Obtain one task returned from the task consumers, and update the task count. Add more tasks if necessary through + :py:attr:`waflib.Runner.Parallel.add_more_tasks`. - :rtype: :py:attr:`waflib.Task.Task` + :rtype: :py:attr:`waflib.Task.TaskBase` """ tsk = self.out.get() if not self.stop: self.add_more_tasks(tsk) - self.mark_finished(tsk) - self.count -= 1 self.dirty = True return tsk def add_task(self, tsk): """ - Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them. + Pass a task to a consumer. - :param tsk: task instance - :type tsk: :py:attr:`waflib.Task.Task` + :param tsk: task + :type tsk: :py:attr:`waflib.Task.TaskBase` """ - # TODO change in waf 2.1 + try: + self.pool + except AttributeError: + self.init_task_pool() self.ready.put(tsk) - def _add_task(self, tsk): - if hasattr(tsk, 'semaphore'): - sem = tsk.semaphore - try: - sem.acquire(tsk) - except IndexError: - sem.waiting.add(tsk) - return - - self.count += 1 - self.processed += 1 - if self.numjobs == 1: - tsk.log_display(tsk.generator.bld) - try: - self.process_task(tsk) - finally: - self.out.put(tsk) + def init_task_pool(self): + # lazy creation, and set a common pool for all task consumers + pool = self.pool = [get_pool() for i in range(self.numjobs)] + self.ready = Queue(0) + def setq(consumer): + consumer.ready = self.ready + for x in pool: + x.ready.put(setq) + return pool + + def free_task_pool(self): + # return the consumers, setting a different queue for each of them + def setq(consumer): + consumer.ready = Queue(0) + self.out.put(self) + try: + pool = self.pool + except AttributeError: + pass else: - self.add_task(tsk) - - def process_task(self, tsk): - """ - Processes a task and attempts to stop the build in case of errors - """ - tsk.process() - if tsk.hasrun != Task.SUCCESS: - self.error_handler(tsk) + for x in pool: + self.ready.put(setq) + for x in pool: + self.get_out() + for x in pool: + put_pool(x) + self.pool = [] def skip(self, tsk): - """ - Mark a task as skipped/up-to-date - """ tsk.hasrun = Task.SKIPPED - self.mark_finished(tsk) - - def cancel(self, tsk): - """ - Mark a task as failed because of unsatisfiable dependencies - """ - tsk.hasrun = Task.CANCELED - self.mark_finished(tsk) def error_handler(self, tsk): """ - Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set, - unless the build is executed with:: + Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set, unless + the build is executed with:: $ waf build -k - :param tsk: task instance - :type tsk: :py:attr:`waflib.Task.Task` + :param tsk: task + :type tsk: :py:attr:`waflib.Task.TaskBase` """ + if hasattr(tsk, 'scan') and hasattr(tsk, 'uid'): + # TODO waf 1.9 - this breaks encapsulation + key = (tsk.uid(), 'imp') + try: + del self.bld.task_sigs[key] + except KeyError: + pass if not self.bld.keep: self.stop = True self.error.append(tsk) def task_status(self, tsk): - """ - Obtains the task status to decide whether to run it immediately or not. - - :return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER` - :rtype: integer - """ try: return tsk.runnable_status() except Exception: self.processed += 1 - tsk.err_msg = traceback.format_exc() + tsk.err_msg = Utils.ex_stack() if not self.stop and self.bld.keep: self.skip(tsk) if self.bld.keep == 1: - # if -k stop on the first exception, if -kk try to go as far as possible + # if -k stop at the first exception, if -kk try to go as far as possible if Logs.verbose > 1 or not self.error: self.error.append(tsk) self.stop = True @@ -442,20 +306,17 @@ class Parallel(object): if Logs.verbose > 1: self.error.append(tsk) return Task.EXCEPTION - tsk.hasrun = Task.EXCEPTION - self.error_handler(tsk) + self.error_handler(tsk) return Task.EXCEPTION def start(self): """ - Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to - :py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread - has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out` - and marks the build as failed by setting the ``stop`` flag. - If only one job is used, then executes the tasks one by one, without consumers. + Give tasks to :py:class:`waflib.Runner.TaskConsumer` instances until the build finishes or the ``stop`` flag is set. + If only one job is used, then execute the tasks one by one, without consumers. """ + self.total = self.bld.total() while not self.stop: @@ -477,135 +338,36 @@ class Parallel(object): self.processed += 1 continue - if self.stop: # stop immediately after a failure is detected + if self.stop: # stop immediately after a failure was detected break + st = self.task_status(tsk) if st == Task.RUN_ME: - self._add_task(tsk) - elif st == Task.ASK_LATER: + tsk.position = (self.processed, self.total) + self.count += 1 + tsk.master = self + self.processed += 1 + + if self.numjobs == 1: + tsk.process() + else: + self.add_task(tsk) + if st == Task.ASK_LATER: self.postpone(tsk) elif st == Task.SKIP_ME: self.processed += 1 self.skip(tsk) self.add_more_tasks(tsk) - elif st == Task.CANCEL_ME: - # A dependency problem has occurred, and the - # build is most likely run with `waf -k` - if Logs.verbose > 1: - self.error.append(tsk) - self.processed += 1 - self.cancel(tsk) # self.count represents the tasks that have been made available to the consumer threads # collect all the tasks after an error else the message may be incomplete while self.error and self.count: self.get_out() - self.ready.put(None) - if not self.stop: - assert not self.count - assert not self.postponed - assert not self.incomplete - - def prio_and_split(self, tasks): - """ - Label input tasks with priority values, and return a pair containing - the tasks that are ready to run and the tasks that are necessarily - waiting for other tasks to complete. - - The priority system is really meant as an optional layer for optimization: - dependency cycles are found quickly, and builds should be more efficient. - A high priority number means that a task is processed first. - - This method can be overridden to disable the priority system:: - - def prio_and_split(self, tasks): - return tasks, [] + #print loop + assert (self.count == 0 or self.stop) - :return: A pair of task lists - :rtype: tuple - """ - # to disable: - #return tasks, [] - for x in tasks: - x.visited = 0 - - reverse = self.revdeps - - groups_done = set() - for x in tasks: - for k in x.run_after: - if isinstance(k, Task.TaskGroup): - if k not in groups_done: - groups_done.add(k) - for j in k.prev: - reverse[j].add(k) - else: - reverse[k].add(x) - - # the priority number is not the tree depth - def visit(n): - if isinstance(n, Task.TaskGroup): - return sum(visit(k) for k in n.next) - - if n.visited == 0: - n.visited = 1 - - if n in reverse: - rev = reverse[n] - n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev) - else: - n.prio_order = n.tree_weight - - n.visited = 2 - elif n.visited == 1: - raise Errors.WafError('Dependency cycle found!') - return n.prio_order - - for x in tasks: - if x.visited != 0: - # must visit all to detect cycles - continue - try: - visit(x) - except Errors.WafError: - self.debug_cycles(tasks, reverse) - - ready = [] - waiting = [] - for x in tasks: - for k in x.run_after: - if not k.hasrun: - waiting.append(x) - break - else: - ready.append(x) - return (ready, waiting) - - def debug_cycles(self, tasks, reverse): - tmp = {} - for x in tasks: - tmp[x] = 0 - - def visit(n, acc): - if isinstance(n, Task.TaskGroup): - for k in n.next: - visit(k, acc) - return - if tmp[n] == 0: - tmp[n] = 1 - for k in reverse.get(n, []): - visit(k, [n] + acc) - tmp[n] = 2 - elif tmp[n] == 1: - lst = [] - for tsk in acc: - lst.append(repr(tsk)) - if tsk is n: - # exclude prior nodes, we want the minimum cycle - break - raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst)) - for x in tasks: - visit(x, []) + # free the task pool, if any + self.free_task_pool() |