diff options
Diffstat (limited to 'rts/Capability.c')
-rw-r--r-- | rts/Capability.c | 125 |
1 files changed, 53 insertions, 72 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 516aaa573d..948922a3b2 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -54,15 +54,17 @@ globalWorkToDo (void) #endif #if defined(THREADED_RTS) -rtsBool stealWork( Capability *cap) { +rtsBool +stealWork (Capability *cap) +{ /* use the normal Sparks.h interface (internally modified to enable concurrent stealing) and immediately turn the spark into a thread when successful */ Capability *robbed; - SparkPool *pool; StgClosurePtr spark; rtsBool success = rtsFalse; + rtsBool retry; nat i = 0; debugTrace(DEBUG_sched, @@ -71,63 +73,40 @@ rtsBool stealWork( Capability *cap) { if (n_capabilities == 1) { return rtsFalse; } // makes no sense... - /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could - start at a random place instead of 0 as well. */ - for ( i=0 ; i < n_capabilities ; i++ ) { - robbed = &capabilities[i]; - if (cap == robbed) // ourselves... - continue; + do { + retry = rtsFalse; - if (emptySparkPoolCap(robbed)) // nothing to steal here - continue; - - spark = findSpark(robbed); + /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could + start at a random place instead of 0 as well. */ + for ( i=0 ; i < n_capabilities ; i++ ) { + robbed = &capabilities[i]; + if (cap == robbed) // ourselves... + continue; - if (spark == NULL && !emptySparkPoolCap(robbed)) { - spark = findSpark(robbed); // lost race in concurrent access, try again - } - if (spark != NULL) { - debugTrace(DEBUG_sched, + if (emptySparkPoolCap(robbed)) // nothing to steal here + continue; + + spark = tryStealSpark(robbed->sparks); + if (spark == NULL && !emptySparkPoolCap(robbed)) { + // we conflicted with another thread while trying to steal; + // try again later. + retry = rtsTrue; + } + + if (spark != NULL) { + debugTrace(DEBUG_sched, "cap %d: Stole a spark from capability %d", - cap->no, robbed->no); + cap->no, robbed->no); - createSparkThread(cap,spark); - success = rtsTrue; - break; // got one, leave the loop - } - // otherwise: no success, try next one - } - debugTrace(DEBUG_sched, - "Leaving work stealing routine (%s)", - success?"one spark stolen":"thefts did not succeed"); - return success; -} + createSparkThread(cap,spark); + return rtsTrue; + } + // otherwise: no success, try next one + } + } while (retry); -STATIC_INLINE rtsBool -anyWorkForMe( Capability *cap, Task *task ) -{ - if (task->tso != NULL) { - // A bound task only runs if its thread is on the run queue of - // the capability on which it was woken up. Otherwise, we - // can't be sure that we have the right capability: the thread - // might be woken up on some other capability, and task->cap - // could change under our feet. - return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task; - } else { - // A vanilla worker task runs if either there is a lightweight - // thread at the head of the run queue, or the run queue is - // empty and (there are sparks to execute, or there is some - // other global condition to check, such as threads blocked on - // blackholes). - if (emptyRunQueue(cap)) { - return !emptySparkPoolCap(cap) - || !emptyWakeupQueue(cap) - || globalWorkToDo() - || stealWork(cap); /* if all false: try to steal work */ - } else { - return cap->run_queue_hd->bound == NULL; - } - } + debugTrace(DEBUG_sched, "No sparks stolen"); + return rtsFalse; } #endif @@ -194,6 +173,9 @@ initCapability( Capability *cap, nat i ) cap->returning_tasks_tl = NULL; cap->wakeup_queue_hd = END_TSO_QUEUE; cap->wakeup_queue_tl = END_TSO_QUEUE; + cap->sparks_created = 0; + cap->sparks_converted = 0; + cap->sparks_pruned = 0; #endif cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; @@ -326,7 +308,8 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) #if defined(THREADED_RTS) void -releaseCapability_ (Capability* cap) +releaseCapability_ (Capability* cap, + rtsBool always_wakeup) { Task *task; @@ -384,8 +367,9 @@ releaseCapability_ (Capability* cap) // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. - if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) - || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (always_wakeup || + !emptyRunQueue(cap) || !emptyWakeupQueue(cap) || + !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); // The worker Task pops itself from the queue; @@ -401,7 +385,15 @@ void releaseCapability (Capability* cap USED_IF_THREADS) { ACQUIRE_LOCK(&cap->lock); - releaseCapability_(cap); + releaseCapability_(cap, rtsFalse); + RELEASE_LOCK(&cap->lock); +} + +void +releaseAndWakeupCapability (Capability* cap USED_IF_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap, rtsTrue); RELEASE_LOCK(&cap->lock); } @@ -427,7 +419,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) } // Bound tasks just float around attached to their TSOs. - releaseCapability_(cap); + releaseCapability_(cap,rtsFalse); RELEASE_LOCK(&cap->lock); } @@ -534,16 +526,6 @@ yieldCapability (Capability** pCap, Task *task) { Capability *cap = *pCap; - // The fast path has no locking, if we don't enter this while loop - - while ( waiting_for_gc - /* i.e. another capability triggered HeapOverflow, is busy - getting capabilities (stopping their owning tasks) */ - || cap->returning_tasks_hd != NULL - /* cap reserved for another task */ - || !anyWorkForMe(cap,task) - /* cap/task have no work */ - ) { debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up @@ -588,7 +570,6 @@ yieldCapability (Capability** pCap, Task *task) trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no); ASSERT(cap->running_task == task); - } *pCap = cap; @@ -630,7 +611,7 @@ wakeupThreadOnCapability (Capability *my_cap, appendToRunQueue(other_cap,tso); trace(TRACE_sched, "resuming capability %d", other_cap->no); - releaseCapability_(other_cap); + releaseCapability_(other_cap,rtsFalse); } else { appendToWakeupQueue(my_cap,other_cap,tso); other_cap->context_switch = 1; @@ -765,7 +746,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) if (!emptyRunQueue(cap) || cap->spare_workers) { debugTrace(DEBUG_sched, "runnable threads or workers still alive, yielding"); - releaseCapability_(cap); // this will wake up a worker + releaseCapability_(cap,rtsFalse); // this will wake up a worker RELEASE_LOCK(&cap->lock); yieldThread(); continue; |