summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rts/Schedule.c161
1 files changed, 62 insertions, 99 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 908acf27fe..544b9c2115 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -741,12 +741,6 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// - threads that have TSO_LOCKED cannot migrate
// - a thread that is bound to the current Task cannot be migrated
//
- // So we walk through the run queue, migrating threads to
- // free_caps[] round-robin, skipping over immovable threads. Each
- // time through free_caps[] we keep one thread for ourselves,
- // provided we haven't encountered one or more immovable threads
- // in this pass.
- //
// This is about the simplest thing we could do; improvements we
// might want to do include:
//
@@ -758,112 +752,81 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
if (n_free_caps > 0) {
StgTSO *prev, *t, *next;
-#ifdef SPARK_PUSHING
- rtsBool pushed_to_all;
-#endif
debugTrace(DEBUG_sched,
"cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
n_free_caps);
- i = 0;
-#ifdef SPARK_PUSHING
- pushed_to_all = rtsFalse;
-#endif
-
- // We want to share threads equally amongst free_caps[] and the
- // current capability, but sometimes we encounter immovable
- // threads. This counter tracks the number of threads we have kept
- // for the current capability minus the number of passes over
- // free_caps[]. If it is great than zero (due to immovable
- // threads), we should try to bring it back to zero again by not
- // keeping any threads for the current capability.
- uint32_t imbalance = 0;
-
- // n_free_caps may be larger than the number of spare threads we have,
- // if there were sparks in the spark pool. To avoid giving away all our
- // threads in this case, we limit the number of caps that we give
- // threads to, to the number of spare threads (n_run_queue-1).
- uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
-
- if (thread_recipients > 0) {
- prev = END_TSO_QUEUE;
- t = cap->run_queue_hd;
- for (; t != END_TSO_QUEUE; t = next) {
- next = t->_link;
- t->_link = END_TSO_QUEUE;
- if (t->bound == task->incall // don't move my bound thread
- || tsoLocked(t)) { // don't move a locked thread
- if (prev == END_TSO_QUEUE) {
- cap->run_queue_hd = t;
- } else {
- setTSOLink(cap, prev, t);
- }
- setTSOPrev(cap, t, prev);
- prev = t;
- imbalance++;
- } else if (i == thread_recipients) {
-#ifdef SPARK_PUSHING
- pushed_to_all = rtsTrue;
-#endif
- // If we have not already kept any threads for this
- // capability during the current pass over free_caps[],
- // keep one now.
- if (imbalance == 0) {
- if (prev == END_TSO_QUEUE) {
- cap->run_queue_hd = t;
- } else {
- setTSOLink(cap, prev, t);
- }
- setTSOPrev(cap, t, prev);
- prev = t;
- } else {
- imbalance--;
- }
- i = 0;
+ // There are n_free_caps+1 caps in total. We will share the threads
+ // evently between them, *except* that if the run queue does not divide
+ // evenly by n_free_caps+1 then we bias towards the current capability.
+ // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
+ uint32_t keep_threads =
+ (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
+
+ // This also ensures that we don't give away all our threads, since
+ // (x + y) / (y + 1) >= 1 when x >= 1.
+
+ // The number of threads we have left.
+ uint32_t n = cap->n_run_queue;
+
+ // prev = the previous thread on this cap's run queue
+ prev = END_TSO_QUEUE;
+
+ // We're going to walk through the run queue, migrating threads to other
+ // capabilities until we have only keep_threads left. We might
+ // encounter a thread that cannot be migrated, in which case we add it
+ // to the current run queue and decrement keep_threads.
+ for (t = cap->run_queue_hd, i = 0;
+ t != END_TSO_QUEUE && n > keep_threads;
+ t = next)
+ {
+ next = t->_link;
+ t->_link = END_TSO_QUEUE;
+
+ // Should we keep this thread?
+ if (t->bound == task->incall // don't move my bound thread
+ || tsoLocked(t) // don't move a locked thread
+ ) {
+ if (prev == END_TSO_QUEUE) {
+ cap->run_queue_hd = t;
} else {
- appendToRunQueue(free_caps[i],t);
- cap->n_run_queue--;
-
- traceEventMigrateThread (cap, t, free_caps[i]->no);
-
- if (t->bound) { t->bound->task->cap = free_caps[i]; }
- t->cap = free_caps[i];
- i++;
+ setTSOLink(cap, prev, t);
}
+ setTSOPrev(cap, t, prev);
+ prev = t;
+ if (keep_threads > 0) keep_threads--;
}
- cap->run_queue_tl = prev;
- IF_DEBUG(sanity, checkRunQueue(cap));
- }
+ // Or migrate it?
+ else {
+ appendToRunQueue(free_caps[i],t);
+ traceEventMigrateThread (cap, t, free_caps[i]->no);
-#ifdef SPARK_PUSHING
- /* JB I left this code in place, it would work but is not necessary */
-
- // If there are some free capabilities that we didn't push any
- // threads to, then try to push a spark to each one.
- if (!pushed_to_all) {
- StgClosure *spark;
- // i is the next free capability to push to
- for (; i < n_free_caps; i++) {
- if (emptySparkPoolCap(free_caps[i])) {
- spark = tryStealSpark(cap->sparks);
- if (spark != NULL) {
- /* TODO: if anyone wants to re-enable this code then
- * they must consider the fizzledSpark(spark) case
- * and update the per-cap spark statistics.
- */
- debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
-
- traceEventStealSpark(free_caps[i], t, cap->no);
-
- newSpark(&(free_caps[i]->r), spark);
- }
- }
+ if (t->bound) { t->bound->task->cap = free_caps[i]; }
+ t->cap = free_caps[i];
+ n--; // we have one fewer threads now
+ i++; // move on to the next free_cap
+ if (i == n_free_caps) i = 0;
}
}
-#endif /* SPARK_PUSHING */
+
+ // Join up the beginning of the queue (prev)
+ // with the rest of the queue (t)
+ if (t == END_TSO_QUEUE) {
+ cap->run_queue_tl = prev;
+ } else {
+ setTSOPrev(cap, t, prev);
+ }
+ if (prev == END_TSO_QUEUE) {
+ cap->run_queue_hd = t;
+ } else {
+ setTSOLink(cap, prev, t);
+ }
+ cap->n_run_queue = n;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
// release the capabilities
for (i = 0; i < n_free_caps; i++) {