diff options
author | Dimitrios Vytiniotis <dimitris@microsoft.com> | 2011-12-15 16:56:53 +0000 |
---|---|---|
committer | Dimitrios Vytiniotis <dimitris@microsoft.com> | 2011-12-15 16:56:53 +0000 |
commit | e8c93ad1207fb3df1e009d86ad3ae837a0049d2e (patch) | |
tree | f2f203ccb1ff747e137c6be2a823e2dcd10863e2 | |
parent | bd13338da1649600b6a63ae7cbf60a35575940dc (diff) | |
parent | 3d7e772f71ea4f19d366b9e866f69b4339d22e22 (diff) | |
download | haskell-e8c93ad1207fb3df1e009d86ad3ae837a0049d2e.tar.gz |
Merge branch 'master' of http://darcs.haskell.org//ghc
-rw-r--r-- | rts/Capability.c | 19 | ||||
-rw-r--r-- | rts/Capability.h | 4 | ||||
-rw-r--r-- | rts/Schedule.c | 244 | ||||
-rw-r--r-- | utils/count_lines/count_lines.lprl | 3 |
4 files changed, 197 insertions, 73 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 41efb176fd..d04d007006 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -34,6 +34,7 @@ Capability MainCapability; nat n_capabilities = 0; +nat enabled_capabilities = 0; Capability *capabilities = NULL; // Holds the Capability which last became free. This is used so that @@ -323,6 +324,8 @@ initCapabilities( void ) #endif + enabled_capabilities = n_capabilities; + // There are no free capabilities to begin with. We will start // a worker Task to each Capability, which will quickly put the // Capability on the free list when it finds nothing to do. @@ -493,7 +496,7 @@ releaseCapability_ (Capability* cap, // anything else to do, give the Capability to a worker thread. if (always_wakeup || !emptyRunQueue(cap) || !emptyInbox(cap) || - !emptySparkPoolCap(cap) || globalWorkToDo()) { + (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); // The worker Task pops itself from the queue; @@ -682,7 +685,8 @@ yieldCapability (Capability** pCap, Task *task) gcWorkerThread(cap); traceEventGcEnd(cap); traceSparkCounters(cap); - return; + // See Note [migrated bound threads 2] + if (task->cap == cap) return; } debugTrace(DEBUG_sched, "giving up capability %d", cap->no); @@ -768,6 +772,17 @@ yieldCapability (Capability** pCap, Task *task) // hold Capabilty C, and task->cap == C, then task cannot be // migrated under our feet. +// Note [migrated bound threads 2] +// +// Second tricky case; +// - A bound Task becomes a GC thread +// - scheduleDoGC() migrates the thread belonging to this Task, +// because the Capability it is on is disabled +// - after GC, gcWorkerThread() returns, but now we are +// holding a Capability that is not the same as task->cap +// - Hence we must check for this case and immediately give up the +// cap we hold. + /* ---------------------------------------------------------------------------- * prodCapability * diff --git a/rts/Capability.h b/rts/Capability.h index f60adf9de4..91b4567186 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -49,6 +49,8 @@ struct Capability_ { // Has there been any activity on this Capability since the last GC? nat idle; + rtsBool disabled; + // The run queue. The Task owning this Capability has exclusive // access to its run queue, so can wake up threads without // taking a lock, and the common path through the scheduler is @@ -197,6 +199,8 @@ INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED, // declared in includes/rts/Threads.h: // extern nat n_capabilities; +extern nat enabled_capabilities; + // Array of all the capabilities // extern Capability *capabilities; diff --git a/rts/Schedule.c b/rts/Schedule.c index eedff32ed1..72b7217ebb 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -133,7 +133,7 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); -static void scheduleFindWork (Capability *cap); +static void scheduleFindWork (Capability **pcap); #if defined(THREADED_RTS) static void scheduleYield (Capability **pcap, Task *task); #endif @@ -145,8 +145,8 @@ static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); -static void scheduleProcessInbox(Capability *cap); -static void scheduleDetectDeadlock (Capability *cap, Task *task); +static void scheduleProcessInbox(Capability **cap); +static void scheduleDetectDeadlock (Capability **pcap, Task *task); static void schedulePushWork(Capability *cap, Task *task); #if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); @@ -159,8 +159,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, StgTSO *t ); static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc); -static Capability *scheduleDoGC(Capability *cap, Task *task, - rtsBool force_major); +static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major); static void deleteThread (Capability *cap, StgTSO *tso); static void deleteAllThreads (Capability *cap); @@ -281,7 +280,7 @@ schedule (Capability *initialCapability, Task *task) case SCHED_INTERRUPTING: debugTrace(DEBUG_sched, "SCHED_INTERRUPTING"); /* scheduleDoGC() deletes all the threads */ - cap = scheduleDoGC(cap,task,rtsFalse); + scheduleDoGC(&cap,task,rtsFalse); // after scheduleDoGC(), we must be shutting down. Either some // other Capability did the final GC, or we did it above, @@ -303,17 +302,13 @@ schedule (Capability *initialCapability, Task *task) barf("sched_state: %d", sched_state); } - scheduleFindWork(cap); + scheduleFindWork(&cap); /* work pushing, currently relevant only for THREADED_RTS: (pushes threads, wakes up idle capabilities for stealing) */ schedulePushWork(cap,task); - scheduleDetectDeadlock(cap,task); - -#if defined(THREADED_RTS) - cap = task->cap; // reload cap, it might have changed -#endif + scheduleDetectDeadlock(&cap,task); // Normally, the only way we can get here with no threads to // run is if a keyboard interrupt received during @@ -396,6 +391,26 @@ schedule (Capability *initialCapability, Task *task) deleteThread(cap,t); } + // If this capability is disabled, migrate the thread away rather + // than running it. NB. but not if the thread is bound: it is + // really hard for a bound thread to migrate itself. Believe me, + // I tried several ways and couldn't find a way to do it. + // Instead, when everything is stopped for GC, we migrate all the + // threads on the run queue then (see scheduleDoGC()). + // + // ToDo: what about TSO_LOCKED? Currently we're migrating those + // when the number of capabilities drops, but we never migrate + // them back if it rises again. Presumably we should, but after + // the thread has been migrated we no longer know what capability + // it was originally on. +#ifdef THREADED_RTS + if (cap->disabled && !t->bound) { + Capability *dest_cap = &capabilities[cap->no % enabled_capabilities]; + migrateThread(cap, t, dest_cap); + continue; + } +#endif + /* context switches are initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 @@ -558,7 +573,7 @@ run_thread: } if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) { - cap = scheduleDoGC(cap,task,rtsFalse); + scheduleDoGC(&cap,task,rtsFalse); } } /* end of while() */ } @@ -608,16 +623,16 @@ schedulePreLoop(void) * -------------------------------------------------------------------------- */ static void -scheduleFindWork (Capability *cap) +scheduleFindWork (Capability **pcap) { - scheduleStartSignalHandlers(cap); + scheduleStartSignalHandlers(*pcap); - scheduleProcessInbox(cap); + scheduleProcessInbox(pcap); - scheduleCheckBlockedThreads(cap); + scheduleCheckBlockedThreads(*pcap); #if defined(THREADED_RTS) - if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } + if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); } #endif } @@ -707,10 +722,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // First grab as many free Capabilities as we can. for (i=0, n_free_caps=0; i < n_capabilities; i++) { cap0 = &capabilities[i]; - if (cap != cap0 && tryGrabCapability(cap0,task)) { + if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) { if (!emptyRunQueue(cap0) - || cap->returning_tasks_hd != NULL - || cap->inbox != (Message*)END_TSO_QUEUE) { + || cap0->returning_tasks_hd != NULL + || cap0->inbox != (Message*)END_TSO_QUEUE) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -869,9 +884,10 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) * ------------------------------------------------------------------------- */ static void -scheduleDetectDeadlock (Capability *cap, Task *task) +scheduleDetectDeadlock (Capability **pcap, Task *task) { - /* + Capability *cap = *pcap; + /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the * other tasks are waiting for work, we must have a deadlock of @@ -896,7 +912,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); + scheduleDoGC (pcap, task, rtsTrue/*force major GC*/); + cap = *pcap; // when force_major == rtsTrue. scheduleDoGC sets // recent_activity to ACTIVITY_DONE_GC and turns off the timer // signal. @@ -976,16 +993,18 @@ scheduleSendPendingMessages(void) * ------------------------------------------------------------------------- */ static void -scheduleProcessInbox (Capability *cap USED_IF_THREADS) +scheduleProcessInbox (Capability **pcap USED_IF_THREADS) { #if defined(THREADED_RTS) Message *m, *next; int r; + Capability *cap = *pcap; while (!emptyInbox(cap)) { if (cap->r.rCurrentNursery->link == NULL || g0->n_new_large_words >= large_alloc_lim) { - scheduleDoGC(cap, cap->running_task, rtsFalse); + scheduleDoGC(pcap, cap->running_task, rtsFalse); + cap = *pcap; } // don't use a blocking acquire; if the lock is held by @@ -1023,7 +1042,7 @@ scheduleProcessInbox (Capability *cap USED_IF_THREADS) static void scheduleActivateSpark(Capability *cap) { - if (anySparks()) + if (anySparks() && !cap->disabled) { createSparkThread(cap); debugTrace(DEBUG_sched, "creating a spark thread"); @@ -1415,21 +1434,24 @@ static void releaseAllCapabilities(Capability *cap, Task *task) * Perform a garbage collection if necessary * -------------------------------------------------------------------------- */ -static Capability * -scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) +static void +scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, + rtsBool force_major) { + Capability *cap = *pcap; rtsBool heap_census; #ifdef THREADED_RTS rtsBool idle_cap[n_capabilities]; rtsBool gc_type; nat i, sync; + StgTSO *tso; #endif if (sched_state == SCHED_SHUTTING_DOWN) { // The final GC has already been done, and the system is // shutting down. We'll probably deadlock if we try to GC // now. - return cap; + return; } #ifdef THREADED_RTS @@ -1459,12 +1481,19 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) yieldCapability() and releaseCapability() in Capability.c */ do { - sync = requestSync(&cap, task, gc_type); + sync = requestSync(pcap, task, gc_type); + cap = *pcap; if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) { // someone else had a pending sync request for a GC, so // let's assume GC has been done and we don't need to GC // again. - return cap; + return; + } + if (sched_state == SCHED_SHUTTING_DOWN) { + // The scheduler might now be shutting down. We tested + // this above, but it might have become true since then as + // we yielded the capability in requestSync(). + return; } } while (sync); @@ -1502,11 +1531,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) || (RtsFlags.ParFlags.parGcLoadBalancingEnabled && N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) { for (i=0; i < n_capabilities; i++) { - idle_cap[i] = rtsFalse; + if (capabilities[i].disabled) { + idle_cap[i] = tryGrabCapability(&capabilities[i], task); + } else { + idle_cap[i] = rtsFalse; + } } } else { for (i=0; i < n_capabilities; i++) { - if (i == cap->no || capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) { + if (capabilities[i].disabled) { + idle_cap[i] = tryGrabCapability(&capabilities[i], task); + } else if (i == cap->no || + capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) { idle_cap[i] = rtsFalse; } else { idle_cap[i] = tryGrabCapability(&capabilities[i], task); @@ -1570,6 +1606,29 @@ delete_threads_and_gc: sched_state = SCHED_SHUTTING_DOWN; } + /* + * When there are disabled capabilities, we want to migrate any + * threads away from them. Normally this happens in the + * scheduler's loop, but only for unbound threads - it's really + * hard for a bound thread to migrate itself. So we have another + * go here. + */ +#if defined(THREADED_RTS) + for (i = enabled_capabilities; i < n_capabilities; i++) { + Capability *tmp_cap, *dest_cap; + tmp_cap = &capabilities[i]; + ASSERT(tmp_cap->disabled); + if (i != cap->no) { + dest_cap = &capabilities[i % enabled_capabilities]; + while (!emptyRunQueue(tmp_cap)) { + tso = popRunQueue(tmp_cap); + migrateThread(tmp_cap, tso, dest_cap); + if (tso->bound) { tso->bound->task->cap = dest_cap; } + } + } + } +#endif + heap_census = scheduleNeedHeapProfile(rtsTrue); traceEventGcStart(cap); @@ -1663,7 +1722,7 @@ delete_threads_and_gc: } #endif - return cap; + return; } /* --------------------------------------------------------------------------- @@ -1848,7 +1907,7 @@ forkProcess(HsStablePtr *entry } /* --------------------------------------------------------------------------- - * Increase the number of Capabilities + * Changing the number of Capabilities * * Changing the number of Capabilities is very tricky! We can only do * it with the system fully stopped, so we do a full sync with @@ -1873,17 +1932,13 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS) Capability *cap; nat sync; StgTSO* t; - nat g; - Capability *old_capabilities; - - if (new_n_capabilities == n_capabilities) return; + nat g, n; + Capability *old_capabilities = NULL; - if (new_n_capabilities < n_capabilities) { - barf("setNumCapabilities: reducing the number of Capabilities is not currently supported."); - } + if (new_n_capabilities == enabled_capabilities) return; debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d", - n_capabilities, new_n_capabilities); + enabled_capabilities, new_n_capabilities); cap = rts_lock(); task = cap->running_task; @@ -1896,31 +1951,76 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS) pending_sync = 0; + if (new_n_capabilities < enabled_capabilities) + { + // Reducing the number of capabilities: we do not actually + // remove the extra capabilities, we just mark them as + // "disabled". This has the following effects: + // + // - threads on a disabled capability are migrated away by the + // scheduler loop + // + // - disabled capabilities do not participate in GC + // (see scheduleDoGC()) + // + // - No spark threads are created on this capability + // (see scheduleActivateSpark()) + // + // - We do not attempt to migrate threads *to* a disabled + // capability (see schedulePushWork()). + // + // but in other respects, a disabled capability remains + // alive. Threads may be woken up on a disabled capability, + // but they will be immediately migrated away. + // + // This approach is much easier than trying to actually remove + // the capability; we don't have to worry about GC data + // structures, the nursery, etc. + // + for (n = new_n_capabilities; n < enabled_capabilities; n++) { + capabilities[n].disabled = rtsTrue; + } + enabled_capabilities = new_n_capabilities; + } + else + { + // Increasing the number of enabled capabilities. + // + // enable any disabled capabilities, up to the required number + for (n = enabled_capabilities; + n < new_n_capabilities && n < n_capabilities; n++) { + capabilities[n].disabled = rtsFalse; + } + enabled_capabilities = n; + + if (new_n_capabilities > n_capabilities) { #if defined(TRACING) - // Allocate eventlog buffers for the new capabilities. Note this - // must be done before calling moreCapabilities(), because that - // will emit events to add the new capabilities to capsets. - tracingAddCapapilities(n_capabilities, new_n_capabilities); + // Allocate eventlog buffers for the new capabilities. Note this + // must be done before calling moreCapabilities(), because that + // will emit events to add the new capabilities to capsets. + tracingAddCapapilities(n_capabilities, new_n_capabilities); #endif - // Resize the capabilities array - // NB. after this, capabilities points somewhere new. Any pointers - // of type (Capability *) are now invalid. - old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities); + // Resize the capabilities array + // NB. after this, capabilities points somewhere new. Any pointers + // of type (Capability *) are now invalid. + old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities); - // update our own cap pointer - cap = &capabilities[cap->no]; + // update our own cap pointer + cap = &capabilities[cap->no]; - // Resize and update storage manager data structures - storageAddCapabilities(n_capabilities, new_n_capabilities); + // Resize and update storage manager data structures + storageAddCapabilities(n_capabilities, new_n_capabilities); - // Update (Capability *) refs in the Task manager. - updateCapabilityRefs(); + // Update (Capability *) refs in the Task manager. + updateCapabilityRefs(); - // Update (Capability *) refs from TSOs - for (g = 0; g < RtsFlags.GcFlags.generations; g++) { - for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) { - t->cap = &capabilities[t->cap->no]; + // Update (Capability *) refs from TSOs + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) { + t->cap = &capabilities[t->cap->no]; + } + } } } @@ -1931,7 +2031,9 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS) startWorkerTasks(n_capabilities, new_n_capabilities); // finally, update n_capabilities - n_capabilities = new_n_capabilities; + if (new_n_capabilities > n_capabilities) { + n_capabilities = enabled_capabilities = new_n_capabilities; + } // We can't free the old array until now, because we access it // while updating pointers in updateCapabilityRefs(). @@ -2177,7 +2279,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't // move this thread from now on. #if defined(THREADED_RTS) - cpu %= n_capabilities; + cpu %= enabled_capabilities; if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { @@ -2332,10 +2434,11 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS) // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; - waitForReturnCapability(&task->cap,task); - scheduleDoGC(task->cap,task,rtsFalse); + Capability *cap = task->cap; + waitForReturnCapability(&cap,task); + scheduleDoGC(&cap,task,rtsFalse); ASSERT(task->incall->tso == NULL); - releaseCapability(task->cap); + releaseCapability(cap); } sched_state = SCHED_SHUTTING_DOWN; @@ -2394,15 +2497,16 @@ static void performGC_(rtsBool force_major) { Task *task; + Capability *cap = NULL; // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the // suspended_ccalls queue. task = newBoundTask(); - waitForReturnCapability(&task->cap,task); - scheduleDoGC(task->cap,task,force_major); - releaseCapability(task->cap); + waitForReturnCapability(&cap,task); + scheduleDoGC(&cap,task,force_major); + releaseCapability(cap); boundTaskExiting(task); } diff --git a/utils/count_lines/count_lines.lprl b/utils/count_lines/count_lines.lprl index 50205ee796..bfaad51804 100644 --- a/utils/count_lines/count_lines.lprl +++ b/utils/count_lines/count_lines.lprl @@ -14,7 +14,7 @@ my $binPath = $FindBin::Bin; foreach $f ( @ARGV ) { if ( $f =~ /\.lhs$/ ) { - open(INF, "$binPath/../../inplace/lib/unlit $f - |") || die "Couldn't unlit $f!\n"; + open(INF, "$binPath/../../../inplace/lib/unlit $f - |") || die "Couldn't unlit $f!\n"; } else { open(INF, "< $f") || die "Couldn't open $f!\n"; } @@ -22,6 +22,7 @@ foreach $f ( @ARGV ) { while (<INF>) { s/--.*//; s/{-.*-}//; + s/\/\/.*//; next if /^\s*$/; $cnt++; } |