summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitrios Vytiniotis <dimitris@microsoft.com>2011-12-15 16:56:53 +0000
committerDimitrios Vytiniotis <dimitris@microsoft.com>2011-12-15 16:56:53 +0000
commite8c93ad1207fb3df1e009d86ad3ae837a0049d2e (patch)
treef2f203ccb1ff747e137c6be2a823e2dcd10863e2
parentbd13338da1649600b6a63ae7cbf60a35575940dc (diff)
parent3d7e772f71ea4f19d366b9e866f69b4339d22e22 (diff)
downloadhaskell-e8c93ad1207fb3df1e009d86ad3ae837a0049d2e.tar.gz
Merge branch 'master' of http://darcs.haskell.org//ghc
-rw-r--r--rts/Capability.c19
-rw-r--r--rts/Capability.h4
-rw-r--r--rts/Schedule.c244
-rw-r--r--utils/count_lines/count_lines.lprl3
4 files changed, 197 insertions, 73 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 41efb176fd..d04d007006 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -34,6 +34,7 @@
Capability MainCapability;
nat n_capabilities = 0;
+nat enabled_capabilities = 0;
Capability *capabilities = NULL;
// Holds the Capability which last became free. This is used so that
@@ -323,6 +324,8 @@ initCapabilities( void )
#endif
+ enabled_capabilities = n_capabilities;
+
// There are no free capabilities to begin with. We will start
// a worker Task to each Capability, which will quickly put the
// Capability on the free list when it finds nothing to do.
@@ -493,7 +496,7 @@ releaseCapability_ (Capability* cap,
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
!emptyRunQueue(cap) || !emptyInbox(cap) ||
- !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
@@ -682,7 +685,8 @@ yieldCapability (Capability** pCap, Task *task)
gcWorkerThread(cap);
traceEventGcEnd(cap);
traceSparkCounters(cap);
- return;
+ // See Note [migrated bound threads 2]
+ if (task->cap == cap) return;
}
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
@@ -768,6 +772,17 @@ yieldCapability (Capability** pCap, Task *task)
// hold Capabilty C, and task->cap == C, then task cannot be
// migrated under our feet.
+// Note [migrated bound threads 2]
+//
+// Second tricky case;
+// - A bound Task becomes a GC thread
+// - scheduleDoGC() migrates the thread belonging to this Task,
+// because the Capability it is on is disabled
+// - after GC, gcWorkerThread() returns, but now we are
+// holding a Capability that is not the same as task->cap
+// - Hence we must check for this case and immediately give up the
+// cap we hold.
+
/* ----------------------------------------------------------------------------
* prodCapability
*
diff --git a/rts/Capability.h b/rts/Capability.h
index f60adf9de4..91b4567186 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -49,6 +49,8 @@ struct Capability_ {
// Has there been any activity on this Capability since the last GC?
nat idle;
+ rtsBool disabled;
+
// The run queue. The Task owning this Capability has exclusive
// access to its run queue, so can wake up threads without
// taking a lock, and the common path through the scheduler is
@@ -197,6 +199,8 @@ INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED,
// declared in includes/rts/Threads.h:
// extern nat n_capabilities;
+extern nat enabled_capabilities;
+
// Array of all the capabilities
//
extern Capability *capabilities;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index eedff32ed1..72b7217ebb 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -133,7 +133,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
-static void scheduleFindWork (Capability *cap);
+static void scheduleFindWork (Capability **pcap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
@@ -145,8 +145,8 @@ static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleProcessInbox(Capability *cap);
-static void scheduleDetectDeadlock (Capability *cap, Task *task);
+static void scheduleProcessInbox(Capability **cap);
+static void scheduleDetectDeadlock (Capability **pcap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
@@ -159,8 +159,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
-static Capability *scheduleDoGC(Capability *cap, Task *task,
- rtsBool force_major);
+static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteAllThreads (Capability *cap);
@@ -281,7 +280,7 @@ schedule (Capability *initialCapability, Task *task)
case SCHED_INTERRUPTING:
debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
/* scheduleDoGC() deletes all the threads */
- cap = scheduleDoGC(cap,task,rtsFalse);
+ scheduleDoGC(&cap,task,rtsFalse);
// after scheduleDoGC(), we must be shutting down. Either some
// other Capability did the final GC, or we did it above,
@@ -303,17 +302,13 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
- scheduleFindWork(cap);
+ scheduleFindWork(&cap);
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
- scheduleDetectDeadlock(cap,task);
-
-#if defined(THREADED_RTS)
- cap = task->cap; // reload cap, it might have changed
-#endif
+ scheduleDetectDeadlock(&cap,task);
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
@@ -396,6 +391,26 @@ schedule (Capability *initialCapability, Task *task)
deleteThread(cap,t);
}
+ // If this capability is disabled, migrate the thread away rather
+ // than running it. NB. but not if the thread is bound: it is
+ // really hard for a bound thread to migrate itself. Believe me,
+ // I tried several ways and couldn't find a way to do it.
+ // Instead, when everything is stopped for GC, we migrate all the
+ // threads on the run queue then (see scheduleDoGC()).
+ //
+ // ToDo: what about TSO_LOCKED? Currently we're migrating those
+ // when the number of capabilities drops, but we never migrate
+ // them back if it rises again. Presumably we should, but after
+ // the thread has been migrated we no longer know what capability
+ // it was originally on.
+#ifdef THREADED_RTS
+ if (cap->disabled && !t->bound) {
+ Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
+ migrateThread(cap, t, dest_cap);
+ continue;
+ }
+#endif
+
/* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
@@ -558,7 +573,7 @@ run_thread:
}
if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
- cap = scheduleDoGC(cap,task,rtsFalse);
+ scheduleDoGC(&cap,task,rtsFalse);
}
} /* end of while() */
}
@@ -608,16 +623,16 @@ schedulePreLoop(void)
* -------------------------------------------------------------------------- */
static void
-scheduleFindWork (Capability *cap)
+scheduleFindWork (Capability **pcap)
{
- scheduleStartSignalHandlers(cap);
+ scheduleStartSignalHandlers(*pcap);
- scheduleProcessInbox(cap);
+ scheduleProcessInbox(pcap);
- scheduleCheckBlockedThreads(cap);
+ scheduleCheckBlockedThreads(*pcap);
#if defined(THREADED_RTS)
- if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+ if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
#endif
}
@@ -707,10 +722,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// First grab as many free Capabilities as we can.
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
- if (cap != cap0 && tryGrabCapability(cap0,task)) {
+ if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
- || cap->returning_tasks_hd != NULL
- || cap->inbox != (Message*)END_TSO_QUEUE) {
+ || cap0->returning_tasks_hd != NULL
+ || cap0->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
@@ -869,9 +884,10 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock (Capability *cap, Task *task)
+scheduleDetectDeadlock (Capability **pcap, Task *task)
{
- /*
+ Capability *cap = *pcap;
+ /*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* other tasks are waiting for work, we must have a deadlock of
@@ -896,7 +912,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
+ scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
+ cap = *pcap;
// when force_major == rtsTrue. scheduleDoGC sets
// recent_activity to ACTIVITY_DONE_GC and turns off the timer
// signal.
@@ -976,16 +993,18 @@ scheduleSendPendingMessages(void)
* ------------------------------------------------------------------------- */
static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
int r;
+ Capability *cap = *pcap;
while (!emptyInbox(cap)) {
if (cap->r.rCurrentNursery->link == NULL ||
g0->n_new_large_words >= large_alloc_lim) {
- scheduleDoGC(cap, cap->running_task, rtsFalse);
+ scheduleDoGC(pcap, cap->running_task, rtsFalse);
+ cap = *pcap;
}
// don't use a blocking acquire; if the lock is held by
@@ -1023,7 +1042,7 @@ scheduleProcessInbox (Capability *cap USED_IF_THREADS)
static void
scheduleActivateSpark(Capability *cap)
{
- if (anySparks())
+ if (anySparks() && !cap->disabled)
{
createSparkThread(cap);
debugTrace(DEBUG_sched, "creating a spark thread");
@@ -1415,21 +1434,24 @@ static void releaseAllCapabilities(Capability *cap, Task *task)
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
-static Capability *
-scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
+static void
+scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
+ rtsBool force_major)
{
+ Capability *cap = *pcap;
rtsBool heap_census;
#ifdef THREADED_RTS
rtsBool idle_cap[n_capabilities];
rtsBool gc_type;
nat i, sync;
+ StgTSO *tso;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
// The final GC has already been done, and the system is
// shutting down. We'll probably deadlock if we try to GC
// now.
- return cap;
+ return;
}
#ifdef THREADED_RTS
@@ -1459,12 +1481,19 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
yieldCapability() and releaseCapability() in Capability.c */
do {
- sync = requestSync(&cap, task, gc_type);
+ sync = requestSync(pcap, task, gc_type);
+ cap = *pcap;
if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
// someone else had a pending sync request for a GC, so
// let's assume GC has been done and we don't need to GC
// again.
- return cap;
+ return;
+ }
+ if (sched_state == SCHED_SHUTTING_DOWN) {
+ // The scheduler might now be shutting down. We tested
+ // this above, but it might have become true since then as
+ // we yielded the capability in requestSync().
+ return;
}
} while (sync);
@@ -1502,11 +1531,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
|| (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
for (i=0; i < n_capabilities; i++) {
- idle_cap[i] = rtsFalse;
+ if (capabilities[i].disabled) {
+ idle_cap[i] = tryGrabCapability(&capabilities[i], task);
+ } else {
+ idle_cap[i] = rtsFalse;
+ }
}
} else {
for (i=0; i < n_capabilities; i++) {
- if (i == cap->no || capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
+ if (capabilities[i].disabled) {
+ idle_cap[i] = tryGrabCapability(&capabilities[i], task);
+ } else if (i == cap->no ||
+ capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
idle_cap[i] = rtsFalse;
} else {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
@@ -1570,6 +1606,29 @@ delete_threads_and_gc:
sched_state = SCHED_SHUTTING_DOWN;
}
+ /*
+ * When there are disabled capabilities, we want to migrate any
+ * threads away from them. Normally this happens in the
+ * scheduler's loop, but only for unbound threads - it's really
+ * hard for a bound thread to migrate itself. So we have another
+ * go here.
+ */
+#if defined(THREADED_RTS)
+ for (i = enabled_capabilities; i < n_capabilities; i++) {
+ Capability *tmp_cap, *dest_cap;
+ tmp_cap = &capabilities[i];
+ ASSERT(tmp_cap->disabled);
+ if (i != cap->no) {
+ dest_cap = &capabilities[i % enabled_capabilities];
+ while (!emptyRunQueue(tmp_cap)) {
+ tso = popRunQueue(tmp_cap);
+ migrateThread(tmp_cap, tso, dest_cap);
+ if (tso->bound) { tso->bound->task->cap = dest_cap; }
+ }
+ }
+ }
+#endif
+
heap_census = scheduleNeedHeapProfile(rtsTrue);
traceEventGcStart(cap);
@@ -1663,7 +1722,7 @@ delete_threads_and_gc:
}
#endif
- return cap;
+ return;
}
/* ---------------------------------------------------------------------------
@@ -1848,7 +1907,7 @@ forkProcess(HsStablePtr *entry
}
/* ---------------------------------------------------------------------------
- * Increase the number of Capabilities
+ * Changing the number of Capabilities
*
* Changing the number of Capabilities is very tricky! We can only do
* it with the system fully stopped, so we do a full sync with
@@ -1873,17 +1932,13 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
Capability *cap;
nat sync;
StgTSO* t;
- nat g;
- Capability *old_capabilities;
-
- if (new_n_capabilities == n_capabilities) return;
+ nat g, n;
+ Capability *old_capabilities = NULL;
- if (new_n_capabilities < n_capabilities) {
- barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
- }
+ if (new_n_capabilities == enabled_capabilities) return;
debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
- n_capabilities, new_n_capabilities);
+ enabled_capabilities, new_n_capabilities);
cap = rts_lock();
task = cap->running_task;
@@ -1896,31 +1951,76 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
pending_sync = 0;
+ if (new_n_capabilities < enabled_capabilities)
+ {
+ // Reducing the number of capabilities: we do not actually
+ // remove the extra capabilities, we just mark them as
+ // "disabled". This has the following effects:
+ //
+ // - threads on a disabled capability are migrated away by the
+ // scheduler loop
+ //
+ // - disabled capabilities do not participate in GC
+ // (see scheduleDoGC())
+ //
+ // - No spark threads are created on this capability
+ // (see scheduleActivateSpark())
+ //
+ // - We do not attempt to migrate threads *to* a disabled
+ // capability (see schedulePushWork()).
+ //
+ // but in other respects, a disabled capability remains
+ // alive. Threads may be woken up on a disabled capability,
+ // but they will be immediately migrated away.
+ //
+ // This approach is much easier than trying to actually remove
+ // the capability; we don't have to worry about GC data
+ // structures, the nursery, etc.
+ //
+ for (n = new_n_capabilities; n < enabled_capabilities; n++) {
+ capabilities[n].disabled = rtsTrue;
+ }
+ enabled_capabilities = new_n_capabilities;
+ }
+ else
+ {
+ // Increasing the number of enabled capabilities.
+ //
+ // enable any disabled capabilities, up to the required number
+ for (n = enabled_capabilities;
+ n < new_n_capabilities && n < n_capabilities; n++) {
+ capabilities[n].disabled = rtsFalse;
+ }
+ enabled_capabilities = n;
+
+ if (new_n_capabilities > n_capabilities) {
#if defined(TRACING)
- // Allocate eventlog buffers for the new capabilities. Note this
- // must be done before calling moreCapabilities(), because that
- // will emit events to add the new capabilities to capsets.
- tracingAddCapapilities(n_capabilities, new_n_capabilities);
+ // Allocate eventlog buffers for the new capabilities. Note this
+ // must be done before calling moreCapabilities(), because that
+ // will emit events to add the new capabilities to capsets.
+ tracingAddCapapilities(n_capabilities, new_n_capabilities);
#endif
- // Resize the capabilities array
- // NB. after this, capabilities points somewhere new. Any pointers
- // of type (Capability *) are now invalid.
- old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
+ // Resize the capabilities array
+ // NB. after this, capabilities points somewhere new. Any pointers
+ // of type (Capability *) are now invalid.
+ old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
- // update our own cap pointer
- cap = &capabilities[cap->no];
+ // update our own cap pointer
+ cap = &capabilities[cap->no];
- // Resize and update storage manager data structures
- storageAddCapabilities(n_capabilities, new_n_capabilities);
+ // Resize and update storage manager data structures
+ storageAddCapabilities(n_capabilities, new_n_capabilities);
- // Update (Capability *) refs in the Task manager.
- updateCapabilityRefs();
+ // Update (Capability *) refs in the Task manager.
+ updateCapabilityRefs();
- // Update (Capability *) refs from TSOs
- for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
- for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
- t->cap = &capabilities[t->cap->no];
+ // Update (Capability *) refs from TSOs
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
+ t->cap = &capabilities[t->cap->no];
+ }
+ }
}
}
@@ -1931,7 +2031,9 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
startWorkerTasks(n_capabilities, new_n_capabilities);
// finally, update n_capabilities
- n_capabilities = new_n_capabilities;
+ if (new_n_capabilities > n_capabilities) {
+ n_capabilities = enabled_capabilities = new_n_capabilities;
+ }
// We can't free the old array until now, because we access it
// while updating pointers in updateCapabilityRefs().
@@ -2177,7 +2279,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
#if defined(THREADED_RTS)
- cpu %= n_capabilities;
+ cpu %= enabled_capabilities;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
@@ -2332,10 +2434,11 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
- waitForReturnCapability(&task->cap,task);
- scheduleDoGC(task->cap,task,rtsFalse);
+ Capability *cap = task->cap;
+ waitForReturnCapability(&cap,task);
+ scheduleDoGC(&cap,task,rtsFalse);
ASSERT(task->incall->tso == NULL);
- releaseCapability(task->cap);
+ releaseCapability(cap);
}
sched_state = SCHED_SHUTTING_DOWN;
@@ -2394,15 +2497,16 @@ static void
performGC_(rtsBool force_major)
{
Task *task;
+ Capability *cap = NULL;
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalls queue.
task = newBoundTask();
- waitForReturnCapability(&task->cap,task);
- scheduleDoGC(task->cap,task,force_major);
- releaseCapability(task->cap);
+ waitForReturnCapability(&cap,task);
+ scheduleDoGC(&cap,task,force_major);
+ releaseCapability(cap);
boundTaskExiting(task);
}
diff --git a/utils/count_lines/count_lines.lprl b/utils/count_lines/count_lines.lprl
index 50205ee796..bfaad51804 100644
--- a/utils/count_lines/count_lines.lprl
+++ b/utils/count_lines/count_lines.lprl
@@ -14,7 +14,7 @@ my $binPath = $FindBin::Bin;
foreach $f ( @ARGV ) {
if ( $f =~ /\.lhs$/ ) {
- open(INF, "$binPath/../../inplace/lib/unlit $f - |") || die "Couldn't unlit $f!\n";
+ open(INF, "$binPath/../../../inplace/lib/unlit $f - |") || die "Couldn't unlit $f!\n";
} else {
open(INF, "< $f") || die "Couldn't open $f!\n";
}
@@ -22,6 +22,7 @@ foreach $f ( @ARGV ) {
while (<INF>) {
s/--.*//;
s/{-.*-}//;
+ s/\/\/.*//;
next if /^\s*$/;
$cnt++;
}