diff options
-rw-r--r-- | rts/Schedule.c | 161 |
1 files changed, 62 insertions, 99 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 908acf27fe..544b9c2115 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -741,12 +741,6 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // - threads that have TSO_LOCKED cannot migrate // - a thread that is bound to the current Task cannot be migrated // - // So we walk through the run queue, migrating threads to - // free_caps[] round-robin, skipping over immovable threads. Each - // time through free_caps[] we keep one thread for ourselves, - // provided we haven't encountered one or more immovable threads - // in this pass. - // // This is about the simplest thing we could do; improvements we // might want to do include: // @@ -758,112 +752,81 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (n_free_caps > 0) { StgTSO *prev, *t, *next; -#ifdef SPARK_PUSHING - rtsBool pushed_to_all; -#endif debugTrace(DEBUG_sched, "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...", cap->no, cap->n_run_queue, sparkPoolSizeCap(cap), n_free_caps); - i = 0; -#ifdef SPARK_PUSHING - pushed_to_all = rtsFalse; -#endif - - // We want to share threads equally amongst free_caps[] and the - // current capability, but sometimes we encounter immovable - // threads. This counter tracks the number of threads we have kept - // for the current capability minus the number of passes over - // free_caps[]. If it is great than zero (due to immovable - // threads), we should try to bring it back to zero again by not - // keeping any threads for the current capability. - uint32_t imbalance = 0; - - // n_free_caps may be larger than the number of spare threads we have, - // if there were sparks in the spark pool. To avoid giving away all our - // threads in this case, we limit the number of caps that we give - // threads to, to the number of spare threads (n_run_queue-1). - uint32_t thread_recipients = stg_min(spare_threads, n_free_caps); - - if (thread_recipients > 0) { - prev = END_TSO_QUEUE; - t = cap->run_queue_hd; - for (; t != END_TSO_QUEUE; t = next) { - next = t->_link; - t->_link = END_TSO_QUEUE; - if (t->bound == task->incall // don't move my bound thread - || tsoLocked(t)) { // don't move a locked thread - if (prev == END_TSO_QUEUE) { - cap->run_queue_hd = t; - } else { - setTSOLink(cap, prev, t); - } - setTSOPrev(cap, t, prev); - prev = t; - imbalance++; - } else if (i == thread_recipients) { -#ifdef SPARK_PUSHING - pushed_to_all = rtsTrue; -#endif - // If we have not already kept any threads for this - // capability during the current pass over free_caps[], - // keep one now. - if (imbalance == 0) { - if (prev == END_TSO_QUEUE) { - cap->run_queue_hd = t; - } else { - setTSOLink(cap, prev, t); - } - setTSOPrev(cap, t, prev); - prev = t; - } else { - imbalance--; - } - i = 0; + // There are n_free_caps+1 caps in total. We will share the threads + // evently between them, *except* that if the run queue does not divide + // evenly by n_free_caps+1 then we bias towards the current capability. + // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2. + uint32_t keep_threads = + (cap->n_run_queue + n_free_caps) / (n_free_caps + 1); + + // This also ensures that we don't give away all our threads, since + // (x + y) / (y + 1) >= 1 when x >= 1. + + // The number of threads we have left. + uint32_t n = cap->n_run_queue; + + // prev = the previous thread on this cap's run queue + prev = END_TSO_QUEUE; + + // We're going to walk through the run queue, migrating threads to other + // capabilities until we have only keep_threads left. We might + // encounter a thread that cannot be migrated, in which case we add it + // to the current run queue and decrement keep_threads. + for (t = cap->run_queue_hd, i = 0; + t != END_TSO_QUEUE && n > keep_threads; + t = next) + { + next = t->_link; + t->_link = END_TSO_QUEUE; + + // Should we keep this thread? + if (t->bound == task->incall // don't move my bound thread + || tsoLocked(t) // don't move a locked thread + ) { + if (prev == END_TSO_QUEUE) { + cap->run_queue_hd = t; } else { - appendToRunQueue(free_caps[i],t); - cap->n_run_queue--; - - traceEventMigrateThread (cap, t, free_caps[i]->no); - - if (t->bound) { t->bound->task->cap = free_caps[i]; } - t->cap = free_caps[i]; - i++; + setTSOLink(cap, prev, t); } + setTSOPrev(cap, t, prev); + prev = t; + if (keep_threads > 0) keep_threads--; } - cap->run_queue_tl = prev; - IF_DEBUG(sanity, checkRunQueue(cap)); - } + // Or migrate it? + else { + appendToRunQueue(free_caps[i],t); + traceEventMigrateThread (cap, t, free_caps[i]->no); -#ifdef SPARK_PUSHING - /* JB I left this code in place, it would work but is not necessary */ - - // If there are some free capabilities that we didn't push any - // threads to, then try to push a spark to each one. - if (!pushed_to_all) { - StgClosure *spark; - // i is the next free capability to push to - for (; i < n_free_caps; i++) { - if (emptySparkPoolCap(free_caps[i])) { - spark = tryStealSpark(cap->sparks); - if (spark != NULL) { - /* TODO: if anyone wants to re-enable this code then - * they must consider the fizzledSpark(spark) case - * and update the per-cap spark statistics. - */ - debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); - - traceEventStealSpark(free_caps[i], t, cap->no); - - newSpark(&(free_caps[i]->r), spark); - } - } + if (t->bound) { t->bound->task->cap = free_caps[i]; } + t->cap = free_caps[i]; + n--; // we have one fewer threads now + i++; // move on to the next free_cap + if (i == n_free_caps) i = 0; } } -#endif /* SPARK_PUSHING */ + + // Join up the beginning of the queue (prev) + // with the rest of the queue (t) + if (t == END_TSO_QUEUE) { + cap->run_queue_tl = prev; + } else { + setTSOPrev(cap, t, prev); + } + if (prev == END_TSO_QUEUE) { + cap->run_queue_hd = t; + } else { + setTSOLink(cap, prev, t); + } + cap->n_run_queue = n; + + IF_DEBUG(sanity, checkRunQueue(cap)); // release the capabilities for (i = 0; i < n_free_caps; i++) { |