summaryrefslogtreecommitdiff
path: root/waflib/Runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'waflib/Runner.py')
-rw-r--r--waflib/Runner.py576
1 files changed, 169 insertions, 407 deletions
diff --git a/waflib/Runner.py b/waflib/Runner.py
index 261084d2..b3087292 100644
--- a/waflib/Runner.py
+++ b/waflib/Runner.py
@@ -1,125 +1,98 @@
#!/usr/bin/env python
# encoding: utf-8
-# Thomas Nagy, 2005-2018 (ita)
+# Thomas Nagy, 2005-2010 (ita)
"""
Runner.py: Task scheduling and execution
+
"""
-import heapq, traceback
+import random, atexit
try:
- from queue import Queue, PriorityQueue
+ from queue import Queue
except ImportError:
from Queue import Queue
- try:
- from Queue import PriorityQueue
- except ImportError:
- class PriorityQueue(Queue):
- def _init(self, maxsize):
- self.maxsize = maxsize
- self.queue = []
- def _put(self, item):
- heapq.heappush(self.queue, item)
- def _get(self):
- return heapq.heappop(self.queue)
-
from waflib import Utils, Task, Errors, Logs
-GAP = 5
+GAP = 10
"""
-Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
+Wait for free tasks if there are at least ``GAP * njobs`` in queue
"""
-class PriorityTasks(object):
- def __init__(self):
- self.lst = []
- def __len__(self):
- return len(self.lst)
- def __iter__(self):
- return iter(self.lst)
- def clear(self):
- self.lst = []
- def append(self, task):
- heapq.heappush(self.lst, task)
- def appendleft(self, task):
- "Deprecated, do not use"
- heapq.heappush(self.lst, task)
- def pop(self):
- return heapq.heappop(self.lst)
- def extend(self, lst):
- if self.lst:
- for x in lst:
- self.append(x)
- else:
- if isinstance(lst, list):
- self.lst = lst
- heapq.heapify(lst)
- else:
- self.lst = lst.lst
-
-class Consumer(Utils.threading.Thread):
+class TaskConsumer(Utils.threading.Thread):
"""
- Daemon thread object that executes a task. It shares a semaphore with
- the coordinator :py:class:`waflib.Runner.Spawner`. There is one
- instance per task to consume.
+ Task consumers belong to a pool of workers
+
+ They wait for tasks in the queue and then use ``task.process(...)``
"""
- def __init__(self, spawner, task):
+ def __init__(self):
Utils.threading.Thread.__init__(self)
- self.task = task
- """Task to execute"""
- self.spawner = spawner
- """Coordinator object"""
- self.setDaemon(1)
- self.start()
- def run(self):
+ self.ready = Queue()
"""
- Processes a single task
+ Obtain :py:class:`waflib.Task.TaskBase` instances from this queue.
"""
- try:
- if not self.spawner.master.stop:
- self.spawner.master.process_task(self.task)
- finally:
- self.spawner.sem.release()
- self.spawner.master.out.put(self.task)
- self.task = None
- self.spawner = None
-
-class Spawner(Utils.threading.Thread):
- """
- Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
- spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
- :py:class:`waflib.Task.Task` instance.
- """
- def __init__(self, master):
- Utils.threading.Thread.__init__(self)
- self.master = master
- """:py:class:`waflib.Runner.Parallel` producer instance"""
- self.sem = Utils.threading.Semaphore(master.numjobs)
- """Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
self.setDaemon(1)
self.start()
+
def run(self):
"""
- Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
+ Loop over the tasks to execute
"""
try:
self.loop()
except Exception:
- # Python 2 prints unnecessary messages when shutting down
- # we also want to stop the thread properly
pass
+
def loop(self):
"""
- Consumes task objects from the producer; ends when the producer has no more
- task to provide.
+ Obtain tasks from :py:attr:`waflib.Runner.TaskConsumer.ready` and call
+ :py:meth:`waflib.Task.TaskBase.process`. If the object is a function, execute it.
"""
- master = self.master
while 1:
- task = master.ready.get()
- self.sem.acquire()
- if not master.stop:
- task.log_display(task.generator.bld)
- Consumer(self, task)
+ tsk = self.ready.get()
+ if not isinstance(tsk, Task.TaskBase):
+ tsk(self)
+ else:
+ tsk.process()
+
+pool = Queue()
+"""
+Pool of task consumer objects
+"""
+
+def get_pool():
+ """
+ Obtain a task consumer from :py:attr:`waflib.Runner.pool`.
+ Do not forget to put it back by using :py:func:`waflib.Runner.put_pool`
+ and reset properly (original waiting queue).
+
+ :rtype: :py:class:`waflib.Runner.TaskConsumer`
+ """
+ try:
+ return pool.get(False)
+ except Exception:
+ return TaskConsumer()
+
+def put_pool(x):
+ """
+ Return a task consumer to the thread pool :py:attr:`waflib.Runner.pool`
+
+ :param x: task consumer object
+ :type x: :py:class:`waflib.Runner.TaskConsumer`
+ """
+ pool.put(x)
+
+def _free_resources():
+ global pool
+ lst = []
+ while pool.qsize():
+ lst.append(pool.get())
+ for x in lst:
+ x.ready.put(None)
+ for x in lst:
+ x.join()
+ pool = None
+atexit.register(_free_resources)
class Parallel(object):
"""
@@ -133,7 +106,7 @@ class Parallel(object):
self.numjobs = j
"""
- Amount of parallel consumers to use
+ Number of consumers in the pool
"""
self.bld = bld
@@ -141,25 +114,19 @@ class Parallel(object):
Instance of :py:class:`waflib.Build.BuildContext`
"""
- self.outstanding = PriorityTasks()
- """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
-
- self.postponed = PriorityTasks()
- """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
+ self.outstanding = []
+ """List of :py:class:`waflib.Task.TaskBase` that may be ready to be executed"""
- self.incomplete = set()
- """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
-
- self.ready = PriorityQueue(0)
- """List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
+ self.frozen = []
+ """List of :py:class:`waflib.Task.TaskBase` that cannot be executed immediately"""
self.out = Queue(0)
- """List of :py:class:`waflib.Task.Task` returned by the task consumers"""
+ """List of :py:class:`waflib.Task.TaskBase` returned by the task consumers"""
self.count = 0
"""Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
- self.processed = 0
+ self.processed = 1
"""Amount of tasks processed"""
self.stop = False
@@ -172,44 +139,33 @@ class Parallel(object):
"""Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
self.dirty = False
- """
- Flag that indicates that the build cache must be saved when a task was executed
- (calls :py:meth:`waflib.Build.BuildContext.store`)"""
-
- self.revdeps = Utils.defaultdict(set)
- """
- The reverse dependency graph of dependencies obtained from Task.run_after
- """
-
- self.spawner = Spawner(self)
- """
- Coordinating daemon thread that spawns thread consumers
- """
+ """Flag to indicate that tasks have been executed, and that the build cache must be saved (call :py:meth:`waflib.Build.BuildContext.store`)"""
def get_next_task(self):
"""
- Obtains the next Task instance to run
+ Obtain the next task to execute.
- :rtype: :py:class:`waflib.Task.Task`
+ :rtype: :py:class:`waflib.Task.TaskBase`
"""
if not self.outstanding:
return None
- return self.outstanding.pop()
+ return self.outstanding.pop(0)
def postpone(self, tsk):
"""
- Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`.
- The order is scrambled so as to consume as many tasks in parallel as possible.
+ A task cannot be executed at this point, put it in the list :py:attr:`waflib.Runner.Parallel.frozen`.
- :param tsk: task instance
- :type tsk: :py:class:`waflib.Task.Task`
+ :param tsk: task
+ :type tsk: :py:class:`waflib.Task.TaskBase`
"""
- self.postponed.append(tsk)
+ if random.randint(0, 1):
+ self.frozen.insert(0, tsk)
+ else:
+ self.frozen.append(tsk)
def refill_task_list(self):
"""
- Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
- Ensures that all tasks in the current build group are complete before processing the next one.
+ Put the next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
"""
while self.count > self.numjobs * GAP:
self.get_out()
@@ -217,224 +173,132 @@ class Parallel(object):
while not self.outstanding:
if self.count:
self.get_out()
- if self.outstanding:
- break
- elif self.postponed:
+ elif self.frozen:
try:
cond = self.deadlock == self.processed
except AttributeError:
pass
else:
if cond:
- # The most common reason is conflicting build order declaration
- # for example: "X run_after Y" and "Y run_after X"
- # Another can be changing "run_after" dependencies while the build is running
- # for example: updating "tsk.run_after" in the "runnable_status" method
+ msg = 'check the build order for the tasks'
+ for tsk in self.frozen:
+ if not tsk.run_after:
+ msg = 'check the methods runnable_status'
+ break
lst = []
- for tsk in self.postponed:
- deps = [id(x) for x in tsk.run_after if not x.hasrun]
- lst.append('%s\t-> %r' % (repr(tsk), deps))
- if not deps:
- lst.append('\n task %r dependencies are done, check its *runnable_status*?' % id(tsk))
- raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst))
+ for tsk in self.frozen:
+ lst.append('%s\t-> %r' % (repr(tsk), [id(x) for x in tsk.run_after]))
+ raise Errors.WafError('Deadlock detected: %s%s' % (msg, ''.join(lst)))
self.deadlock = self.processed
- if self.postponed:
- self.outstanding.extend(self.postponed)
- self.postponed.clear()
+ if self.frozen:
+ self.outstanding += self.frozen
+ self.frozen = []
elif not self.count:
- if self.incomplete:
- for x in self.incomplete:
- for k in x.run_after:
- if not k.hasrun:
- break
- else:
- # dependency added after the build started without updating revdeps
- self.incomplete.remove(x)
- self.outstanding.append(x)
- break
- else:
- raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete)
- else:
- tasks = next(self.biter)
- ready, waiting = self.prio_and_split(tasks)
- self.outstanding.extend(ready)
- self.incomplete.update(waiting)
- self.total = self.bld.total()
- break
+ self.outstanding.extend(next(self.biter))
+ self.total = self.bld.total()
+ break
def add_more_tasks(self, tsk):
"""
- If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
- in that list are added to the current build and will be processed before the next build group.
+ Tasks may be added dynamically during the build by binding them to the task :py:attr:`waflib.Task.TaskBase.more_tasks`
- The priorities for dependent tasks are not re-calculated globally
-
- :param tsk: task instance
- :type tsk: :py:attr:`waflib.Task.Task`
+ :param tsk: task
+ :type tsk: :py:attr:`waflib.Task.TaskBase`
"""
if getattr(tsk, 'more_tasks', None):
- more = set(tsk.more_tasks)
- groups_done = set()
- def iteri(a, b):
- for x in a:
- yield x
- for x in b:
- yield x
-
- # Update the dependency tree
- # this assumes that task.run_after values were updated
- for x in iteri(self.outstanding, self.incomplete):
- for k in x.run_after:
- if isinstance(k, Task.TaskGroup):
- if k not in groups_done:
- groups_done.add(k)
- for j in k.prev & more:
- self.revdeps[j].add(k)
- elif k in more:
- self.revdeps[k].add(x)
-
- ready, waiting = self.prio_and_split(tsk.more_tasks)
- self.outstanding.extend(ready)
- self.incomplete.update(waiting)
+ self.outstanding += tsk.more_tasks
self.total += len(tsk.more_tasks)
- def mark_finished(self, tsk):
- def try_unfreeze(x):
- # DAG ancestors are likely to be in the incomplete set
- # This assumes that the run_after contents have not changed
- # after the build starts, else a deadlock may occur
- if x in self.incomplete:
- # TODO remove dependencies to free some memory?
- # x.run_after.remove(tsk)
- for k in x.run_after:
- if not k.hasrun:
- break
- else:
- self.incomplete.remove(x)
- self.outstanding.append(x)
-
- if tsk in self.revdeps:
- for x in self.revdeps[tsk]:
- if isinstance(x, Task.TaskGroup):
- x.prev.remove(tsk)
- if not x.prev:
- for k in x.next:
- # TODO necessary optimization?
- k.run_after.remove(x)
- try_unfreeze(k)
- # TODO necessary optimization?
- x.next = []
- else:
- try_unfreeze(x)
- del self.revdeps[tsk]
-
- if hasattr(tsk, 'semaphore'):
- sem = tsk.semaphore
- sem.release(tsk)
- while sem.waiting and not sem.is_locked():
- # take a frozen task, make it ready to run
- x = sem.waiting.pop()
- self._add_task(x)
-
def get_out(self):
"""
- Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
- Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
+ Obtain one task returned from the task consumers, and update the task count. Add more tasks if necessary through
+ :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
- :rtype: :py:attr:`waflib.Task.Task`
+ :rtype: :py:attr:`waflib.Task.TaskBase`
"""
tsk = self.out.get()
if not self.stop:
self.add_more_tasks(tsk)
- self.mark_finished(tsk)
-
self.count -= 1
self.dirty = True
return tsk
def add_task(self, tsk):
"""
- Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
+ Pass a task to a consumer.
- :param tsk: task instance
- :type tsk: :py:attr:`waflib.Task.Task`
+ :param tsk: task
+ :type tsk: :py:attr:`waflib.Task.TaskBase`
"""
- # TODO change in waf 2.1
+ try:
+ self.pool
+ except AttributeError:
+ self.init_task_pool()
self.ready.put(tsk)
- def _add_task(self, tsk):
- if hasattr(tsk, 'semaphore'):
- sem = tsk.semaphore
- try:
- sem.acquire(tsk)
- except IndexError:
- sem.waiting.add(tsk)
- return
-
- self.count += 1
- self.processed += 1
- if self.numjobs == 1:
- tsk.log_display(tsk.generator.bld)
- try:
- self.process_task(tsk)
- finally:
- self.out.put(tsk)
+ def init_task_pool(self):
+ # lazy creation, and set a common pool for all task consumers
+ pool = self.pool = [get_pool() for i in range(self.numjobs)]
+ self.ready = Queue(0)
+ def setq(consumer):
+ consumer.ready = self.ready
+ for x in pool:
+ x.ready.put(setq)
+ return pool
+
+ def free_task_pool(self):
+ # return the consumers, setting a different queue for each of them
+ def setq(consumer):
+ consumer.ready = Queue(0)
+ self.out.put(self)
+ try:
+ pool = self.pool
+ except AttributeError:
+ pass
else:
- self.add_task(tsk)
-
- def process_task(self, tsk):
- """
- Processes a task and attempts to stop the build in case of errors
- """
- tsk.process()
- if tsk.hasrun != Task.SUCCESS:
- self.error_handler(tsk)
+ for x in pool:
+ self.ready.put(setq)
+ for x in pool:
+ self.get_out()
+ for x in pool:
+ put_pool(x)
+ self.pool = []
def skip(self, tsk):
- """
- Mark a task as skipped/up-to-date
- """
tsk.hasrun = Task.SKIPPED
- self.mark_finished(tsk)
-
- def cancel(self, tsk):
- """
- Mark a task as failed because of unsatisfiable dependencies
- """
- tsk.hasrun = Task.CANCELED
- self.mark_finished(tsk)
def error_handler(self, tsk):
"""
- Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set,
- unless the build is executed with::
+ Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set, unless
+ the build is executed with::
$ waf build -k
- :param tsk: task instance
- :type tsk: :py:attr:`waflib.Task.Task`
+ :param tsk: task
+ :type tsk: :py:attr:`waflib.Task.TaskBase`
"""
+ if hasattr(tsk, 'scan') and hasattr(tsk, 'uid'):
+ # TODO waf 1.9 - this breaks encapsulation
+ key = (tsk.uid(), 'imp')
+ try:
+ del self.bld.task_sigs[key]
+ except KeyError:
+ pass
if not self.bld.keep:
self.stop = True
self.error.append(tsk)
def task_status(self, tsk):
- """
- Obtains the task status to decide whether to run it immediately or not.
-
- :return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
- :rtype: integer
- """
try:
return tsk.runnable_status()
except Exception:
self.processed += 1
- tsk.err_msg = traceback.format_exc()
+ tsk.err_msg = Utils.ex_stack()
if not self.stop and self.bld.keep:
self.skip(tsk)
if self.bld.keep == 1:
- # if -k stop on the first exception, if -kk try to go as far as possible
+ # if -k stop at the first exception, if -kk try to go as far as possible
if Logs.verbose > 1 or not self.error:
self.error.append(tsk)
self.stop = True
@@ -442,20 +306,17 @@ class Parallel(object):
if Logs.verbose > 1:
self.error.append(tsk)
return Task.EXCEPTION
-
tsk.hasrun = Task.EXCEPTION
- self.error_handler(tsk)
+ self.error_handler(tsk)
return Task.EXCEPTION
def start(self):
"""
- Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
- :py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
- has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
- and marks the build as failed by setting the ``stop`` flag.
- If only one job is used, then executes the tasks one by one, without consumers.
+ Give tasks to :py:class:`waflib.Runner.TaskConsumer` instances until the build finishes or the ``stop`` flag is set.
+ If only one job is used, then execute the tasks one by one, without consumers.
"""
+
self.total = self.bld.total()
while not self.stop:
@@ -477,135 +338,36 @@ class Parallel(object):
self.processed += 1
continue
- if self.stop: # stop immediately after a failure is detected
+ if self.stop: # stop immediately after a failure was detected
break
+
st = self.task_status(tsk)
if st == Task.RUN_ME:
- self._add_task(tsk)
- elif st == Task.ASK_LATER:
+ tsk.position = (self.processed, self.total)
+ self.count += 1
+ tsk.master = self
+ self.processed += 1
+
+ if self.numjobs == 1:
+ tsk.process()
+ else:
+ self.add_task(tsk)
+ if st == Task.ASK_LATER:
self.postpone(tsk)
elif st == Task.SKIP_ME:
self.processed += 1
self.skip(tsk)
self.add_more_tasks(tsk)
- elif st == Task.CANCEL_ME:
- # A dependency problem has occurred, and the
- # build is most likely run with `waf -k`
- if Logs.verbose > 1:
- self.error.append(tsk)
- self.processed += 1
- self.cancel(tsk)
# self.count represents the tasks that have been made available to the consumer threads
# collect all the tasks after an error else the message may be incomplete
while self.error and self.count:
self.get_out()
- self.ready.put(None)
- if not self.stop:
- assert not self.count
- assert not self.postponed
- assert not self.incomplete
-
- def prio_and_split(self, tasks):
- """
- Label input tasks with priority values, and return a pair containing
- the tasks that are ready to run and the tasks that are necessarily
- waiting for other tasks to complete.
-
- The priority system is really meant as an optional layer for optimization:
- dependency cycles are found quickly, and builds should be more efficient.
- A high priority number means that a task is processed first.
-
- This method can be overridden to disable the priority system::
-
- def prio_and_split(self, tasks):
- return tasks, []
+ #print loop
+ assert (self.count == 0 or self.stop)
- :return: A pair of task lists
- :rtype: tuple
- """
- # to disable:
- #return tasks, []
- for x in tasks:
- x.visited = 0
-
- reverse = self.revdeps
-
- groups_done = set()
- for x in tasks:
- for k in x.run_after:
- if isinstance(k, Task.TaskGroup):
- if k not in groups_done:
- groups_done.add(k)
- for j in k.prev:
- reverse[j].add(k)
- else:
- reverse[k].add(x)
-
- # the priority number is not the tree depth
- def visit(n):
- if isinstance(n, Task.TaskGroup):
- return sum(visit(k) for k in n.next)
-
- if n.visited == 0:
- n.visited = 1
-
- if n in reverse:
- rev = reverse[n]
- n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev)
- else:
- n.prio_order = n.tree_weight
-
- n.visited = 2
- elif n.visited == 1:
- raise Errors.WafError('Dependency cycle found!')
- return n.prio_order
-
- for x in tasks:
- if x.visited != 0:
- # must visit all to detect cycles
- continue
- try:
- visit(x)
- except Errors.WafError:
- self.debug_cycles(tasks, reverse)
-
- ready = []
- waiting = []
- for x in tasks:
- for k in x.run_after:
- if not k.hasrun:
- waiting.append(x)
- break
- else:
- ready.append(x)
- return (ready, waiting)
-
- def debug_cycles(self, tasks, reverse):
- tmp = {}
- for x in tasks:
- tmp[x] = 0
-
- def visit(n, acc):
- if isinstance(n, Task.TaskGroup):
- for k in n.next:
- visit(k, acc)
- return
- if tmp[n] == 0:
- tmp[n] = 1
- for k in reverse.get(n, []):
- visit(k, [n] + acc)
- tmp[n] = 2
- elif tmp[n] == 1:
- lst = []
- for tsk in acc:
- lst.append(repr(tsk))
- if tsk is n:
- # exclude prior nodes, we want the minimum cycle
- break
- raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst))
- for x in tasks:
- visit(x, [])
+ # free the task pool, if any
+ self.free_task_pool()