diff options
| -rw-r--r-- | ghc/rts/Capability.c | 53 | ||||
| -rw-r--r-- | ghc/rts/Capability.h | 4 | ||||
| -rw-r--r-- | ghc/rts/PrimOps.cmm | 20 | ||||
| -rw-r--r-- | ghc/rts/Schedule.c | 107 | ||||
| -rw-r--r-- | ghc/rts/Schedule.h | 12 | ||||
| -rw-r--r-- | ghc/rts/Select.c | 103 | ||||
| -rw-r--r-- | ghc/rts/Signals.c | 13 | ||||
| -rw-r--r-- | ghc/rts/Signals.h | 1 |
8 files changed, 90 insertions, 223 deletions
diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 3ea96fe770..62f205d4d4 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -56,8 +56,8 @@ nat rts_n_waiting_workers = 0; * exclusive access to the RTS and all its data structures (that are not * locked by the Scheduler's mutex). * - * thread_ready_cond is signalled whenever noCapabilities doesn't hold. - * + * thread_ready_cond is signalled whenever + * !noCapabilities && !EMPTY_RUN_QUEUE(). */ Condition thread_ready_cond = INIT_COND_VAR; @@ -82,6 +82,12 @@ static rtsBool passingCapability = rtsFalse; #define UNUSED_IF_NOT_SMP STG_UNUSED #endif +#if defined(RTS_USER_SIGNALS) +#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || signals_pending()) +#else +#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted) +#endif + /* ---------------------------------------------------------------------------- Initialisation ------------------------------------------------------------------------- */ @@ -211,7 +217,7 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) rts_n_free_capabilities = 1; #endif // Signal that a capability is available - if (rts_n_waiting_tasks > 0) { + if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) { signalCondition(&thread_ready_cond); } startSchedulerTaskIfNecessary(); @@ -263,7 +269,6 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap ) if ( noCapabilities() || passingCapability ) { rts_n_waiting_workers++; - wakeBlockedWorkerThread(); context_switch = 1; // make sure it's our turn soon waitCondition(&returning_worker_cond, pMutex); #if defined(SMP) @@ -294,8 +299,16 @@ yieldCapability( Capability** pCap ) // Pre-condition: pMutex is assumed held, the current thread // holds the capability pointed to by pCap. - if ( rts_n_waiting_workers > 0 || passingCapability ) { - IF_DEBUG(scheduler, sched_belch("worker: giving up capability")); + if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) { + IF_DEBUG(scheduler, + if (rts_n_waiting_workers > 0) { + sched_belch("worker: giving up capability (returning wkr)"); + } else if (passingCapability) { + sched_belch("worker: giving up capability (passing capability)"); + } else { + sched_belch("worker: giving up capability (no threads to run)"); + } + ); releaseCapability(*pCap); *pCap = NULL; } @@ -324,13 +337,14 @@ yieldCapability( Capability** pCap ) * passed to this thread using passCapability. * ------------------------------------------------------------------------- */ -void +void waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) { // Pre-condition: pMutex is held. - while ( noCapabilities() || - (passingCapability && passTarget != pThreadCond)) { + while ( noCapabilities() || + (passingCapability && passTarget != pThreadCond) || + !ANY_WORK_TO_DO()) { IF_DEBUG(scheduler, sched_belch("worker: wait for capability (cond: %p)", pThreadCond)); @@ -384,6 +398,27 @@ passCapabilityToWorker( void ) #endif /* RTS_SUPPORTS_THREADS */ +/* ---------------------------------------------------------------------------- + threadRunnable() + + Signals that a thread has been placed on the run queue, so a worker + might need to be woken up to run it. + + ToDo: should check whether the thread at the front of the queue is + bound, and if so wake up the appropriate worker. + -------------------------------------------------------------------------- */ + +void +threadRunnable ( void ) +{ +#if defined(RTS_SUPPORTS_THREADS) + if ( !noCapabilities && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) { + signalCondition(&thread_ready_cond); + } + startSchedulerTaskIfNecessary(); +#endif +} + /* ------------------------------------------------------------------------- */ #if defined(SMP) diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index 450bf74275..e615035187 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -31,6 +31,10 @@ extern void initCapabilities( void ); // extern void releaseCapability( Capability* cap ); +// Signal that a thread has become runnable +// +extern void threadRunnable ( void ); + #ifdef RTS_SUPPORTS_THREADS // Gives up the current capability IFF there is a higher-priority // thread waiting for it. This happens in one of two ways: diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index c9556f4d33..91c1325872 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -1342,6 +1342,10 @@ mkApUpd0zh_fast waitReadzh_fast { /* args: R1 */ +#ifdef THREADED_RTS + foreign "C" barf("waitRead# on threaded RTS"); +#endif + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; StgTSO_block_info(CurrentTSO) = R1; @@ -1354,6 +1358,10 @@ waitReadzh_fast waitWritezh_fast { /* args: R1 */ +#ifdef THREADED_RTS + foreign "C" barf("waitWrite# on threaded RTS"); +#endif + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; StgTSO_block_info(CurrentTSO) = R1; @@ -1374,6 +1382,10 @@ delayzh_fast W_ t, prev, target; #endif +#ifdef THREADED_RTS + foreign "C" barf("delay# on threaded RTS"); +#endif + /* args: R1 (microsecond delay amount) */ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16; @@ -1432,6 +1444,10 @@ asyncReadzh_fast W_ ares; CInt reqID; +#ifdef THREADED_RTS + foreign "C" barf("asyncRead# on threaded RTS"); +#endif + /* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; @@ -1454,6 +1470,10 @@ asyncWritezh_fast W_ ares; CInt reqID; +#ifdef THREADED_RTS + foreign "C" barf("asyncWrite# on threaded RTS"); +#endif + /* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 04e9f1f43d..04e70dae73 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -453,13 +453,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // run queue is empty, and there are no other tasks running, we // can wait indefinitely for something to happen. // - if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) + if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) + { #if defined(RTS_SUPPORTS_THREADS) - || EMPTY_RUN_QUEUE() + // We shouldn't be here... + barf("schedule: awaitEvent() in threaded RTS"); #endif - ) - { - awaitEvent( EMPTY_RUN_QUEUE() ); + awaitEvent( EMPTY_RUN_QUEUE() ); } // we can be interrupted while waiting for I/O... if (interrupted) continue; @@ -479,18 +479,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, if ( EMPTY_THREAD_QUEUES() ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); + // Garbage collection can release some new threads due to // either (a) finalizers or (b) threads resurrected because - // they are about to be send BlockedOnDeadMVar. Any threads - // thus released will be immediately runnable. + // they are unreachable and will therefore be sent an + // exception. Any threads thus released will be immediately + // runnable. GarbageCollect(GetRoots,rtsTrue); - - if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } - - IF_DEBUG(scheduler, - sched_belch("still deadlocked, checking for black holes...")); - detectBlackHoles(); - if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } #if defined(RTS_USER_SIGNALS) @@ -1457,12 +1452,6 @@ forkProcess(HsStablePtr *entry stgFree(m); } -# ifdef RTS_SUPPORTS_THREADS - resetTaskManagerAfterFork(); // tell startTask() and friends that - startingWorkerThread = rtsFalse; // we have no worker threads any more - resetWorkerWakeupPipeAfterFork(); -# endif - rc = rts_evalStableIO(entry, NULL); // run the action rts_checkSchedStatus("forkProcess",rc); @@ -1568,8 +1557,6 @@ suspendThread( StgRegTable *reg ) IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); #endif - /* Other threads _might_ be available for execution; signal this */ - THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); errno = saved_errno; @@ -1933,11 +1920,10 @@ static void scheduleThread_ (StgTSO* tso); void scheduleThread_(StgTSO *tso) { - // Precondition: sched_mutex must be held. // The thread goes at the *end* of the run-queue, to avoid possible // starvation of any threads already on the queue. APPEND_TO_RUN_QUEUE(tso); - THREAD_RUNNABLE(); + threadRunnable(); } void @@ -1997,7 +1983,7 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id)); APPEND_TO_RUN_QUEUE(tso); - // NB. Don't call THREAD_RUNNABLE() here, because the thread is + // NB. Don't call threadRunnable() here, because the thread is // bound and only runnable by *this* OS thread, so waking up other // workers will just slow things down. @@ -2428,7 +2414,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) next = bqe->link; ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging? APPEND_TO_RUN_QUEUE((StgTSO *)bqe); - THREAD_RUNNABLE(); + threadRunnable(); unblockCount(bqe, node); /* reset blocking status after dumping event */ ((StgTSO *)bqe)->why_blocked = NotBlocked; @@ -2473,7 +2459,7 @@ unblockOneLocked(StgTSO *tso) next = tso->link; tso->link = END_TSO_QUEUE; APPEND_TO_RUN_QUEUE(tso); - THREAD_RUNNABLE(); + threadRunnable(); IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id)); return next; } @@ -2644,9 +2630,6 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; -#ifdef RTS_SUPPORTS_THREADS - wakeBlockedWorkerThread(); -#endif } /* ----------------------------------------------------------------------------- @@ -3277,70 +3260,6 @@ resurrectThreads( StgTSO *threads ) } } -/* ----------------------------------------------------------------------------- - * Blackhole detection: if we reach a deadlock, test whether any - * threads are blocked on themselves. Any threads which are found to - * be self-blocked get sent a NonTermination exception. - * - * This is only done in a deadlock situation in order to avoid - * performance overhead in the normal case. - * - * Locks: sched_mutex is held upon entry and exit. - * -------------------------------------------------------------------------- */ - -#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS) -static void -detectBlackHoles( void ) -{ - StgTSO *tso = all_threads; - StgPtr frame; - StgClosure *blocked_on; - StgRetInfoTable *info; - - for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) { - - while (tso->what_next == ThreadRelocated) { - tso = tso->link; - ASSERT(get_itbl(tso)->type == TSO); - } - - if (tso->why_blocked != BlockedOnBlackHole) { - continue; - } - blocked_on = tso->block_info.closure; - - frame = tso->sp; - - while(1) { - info = get_ret_itbl((StgClosure *)frame); - switch (info->i.type) { - case UPDATE_FRAME: - if (((StgUpdateFrame *)frame)->updatee == blocked_on) { - /* We are blocking on one of our own computations, so - * send this thread the NonTermination exception. - */ - IF_DEBUG(scheduler, - sched_belch("thread %d is blocked on itself", tso->id)); - raiseAsync(tso, (StgClosure *)NonTermination_closure); - goto done; - } - - frame = (StgPtr)((StgUpdateFrame *)frame + 1); - continue; - - case STOP_FRAME: - goto done; - - // normal stack frames; do nothing except advance the pointer - default: - frame += stack_frame_sizeW((StgClosure *)frame); - } - } - done: ; - } -} -#endif - /* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * [Also provides useful information when debugging threaded programs diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index 8924a62dac..59d900f47e 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -156,7 +156,6 @@ extern nat RTS_VAR(rts_n_waiting_workers); extern nat RTS_VAR(rts_n_waiting_tasks); #endif -StgBool rtsSupportsBoundThreads(void); StgBool isThreadBound(StgTSO *tso); extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret); @@ -280,17 +279,6 @@ void labelThread(StgPtr tso, char *label); } \ blocked_queue_tl = tso; -/* Signal that a runnable thread has become available, in - * case there are any waiting tasks to execute it. - */ -#if defined(RTS_SUPPORTS_THREADS) -#define THREAD_RUNNABLE() \ - wakeBlockedWorkerThread(); \ - context_switch = 1; -#else -#define THREAD_RUNNABLE() /* nothing */ -#endif - /* Check whether various thread queues are empty */ #define EMPTY_QUEUE(q) (q == END_TSO_QUEUE) diff --git a/ghc/rts/Select.c b/ghc/rts/Select.c index 26870641bc..418e48cb7a 100644 --- a/ghc/rts/Select.c +++ b/ghc/rts/Select.c @@ -37,13 +37,6 @@ /* last timestamp */ nat timestamp = 0; -#ifdef RTS_SUPPORTS_THREADS -static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse; -static rtsBool workerWakeupPending = rtsFalse; -static int workerWakeupPipe[2]; -static rtsBool workerWakeupInited = rtsFalse; -#endif - /* There's a clever trick here to avoid problems when the time wraps * around. Since our maximum delay is smaller than 31 bits of ticks * (it's actually 31 bits of microseconds), we can safely check @@ -163,34 +156,6 @@ awaitEvent(rtsBool wait) } } -#ifdef RTS_SUPPORTS_THREADS - if(!workerWakeupInited) { - pipe(workerWakeupPipe); - workerWakeupInited = rtsTrue; - } - FD_SET(workerWakeupPipe[0], &rfd); - maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd; -#endif - - /* Release the scheduler lock while we do the poll. - * this means that someone might muck with the blocked_queue - * while we do this, but it shouldn't matter: - * - * - another task might poll for I/O and remove one - * or more threads from the blocked_queue. - * - more I/O threads may be added to blocked_queue. - * - more delayed threads may be added to blocked_queue. We'll - * just subtract delta from their delays after the poll. - * - * I believe none of these cases lead to trouble --SDM. - */ - -#ifdef RTS_SUPPORTS_THREADS - isWorkerBlockedInAwaitEvent = rtsTrue; - workerWakeupPending = rtsFalse; -#endif - RELEASE_LOCK(&sched_mutex); - /* Check for any interesting events */ tv.tv_sec = min / 1000000; @@ -223,10 +188,6 @@ awaitEvent(rtsBool wait) barf("select failed"); } } - ACQUIRE_LOCK(&sched_mutex); -#ifdef RTS_SUPPORTS_THREADS - isWorkerBlockedInAwaitEvent = rtsFalse; -#endif /* We got a signal; could be one of ours. If so, we need * to start up the signal handler straight away, otherwise @@ -235,9 +196,7 @@ awaitEvent(rtsBool wait) */ #if defined(RTS_USER_SIGNALS) if (signals_pending()) { - RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); - ACQUIRE_LOCK(&sched_mutex); return; /* still hold the lock */ } #endif @@ -258,24 +217,8 @@ awaitEvent(rtsBool wait) if (run_queue_hd != END_TSO_QUEUE) { return; /* still hold the lock */ } - -#ifdef RTS_SUPPORTS_THREADS - /* If another worker thread wants to take over, - * return to the scheduler - */ - if (needToYieldToReturningWorker()) { - return; /* still hold the lock */ - } -#endif - -#ifdef RTS_SUPPORTS_THREADS - isWorkerBlockedInAwaitEvent = rtsTrue; -#endif - RELEASE_LOCK(&sched_mutex); } - ACQUIRE_LOCK(&sched_mutex); - /* Step through the waiting queue, unblocking every thread that now has * a file descriptor in a ready state. */ @@ -317,51 +260,5 @@ awaitEvent(rtsBool wait) } } -#if defined(RTS_SUPPORTS_THREADS) - // if we were woken up by wakeBlockedWorkerThread, - // read the dummy byte from the pipe - if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) { - unsigned char dummy; - wait = rtsFalse; - read(workerWakeupPipe[0],&dummy,1); - } -#endif } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE); } - - -#ifdef RTS_SUPPORTS_THREADS -/* wakeBlockedWorkerThread - * - * If a worker thread is currently blocked within awaitEvent, - * wake it. - * Must be called with sched_mutex held. - */ -void -wakeBlockedWorkerThread() -{ - if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) { - unsigned char dummy = 42; // Any value will do here - - // write something so that select() wakes up - write(workerWakeupPipe[1],&dummy,1); - workerWakeupPending = rtsTrue; - } -} - -/* resetWorkerWakeupPipeAfterFork - * - * To be called right after a fork(). - * After the fork(), the worker wakeup pipe will be shared - * with the parent process, and that's something we don't want. - */ -void -resetWorkerWakeupPipeAfterFork() -{ - if(workerWakeupInited) { - close(workerWakeupPipe[0]); - close(workerWakeupPipe[1]); - } - workerWakeupInited = rtsFalse; -} -#endif diff --git a/ghc/rts/Signals.c b/ghc/rts/Signals.c index d5a046e01b..ac6d26674d 100644 --- a/ghc/rts/Signals.c +++ b/ghc/rts/Signals.c @@ -54,22 +54,25 @@ static nat n_haskell_handlers = 0; StgPtr pending_handler_buf[N_PENDING_HANDLERS]; StgPtr *next_pending_handler = pending_handler_buf; +/* ----------------------------------------------------------------------------- + * Signal handling + * -------------------------------------------------------------------------- */ + #ifdef RTS_SUPPORTS_THREADS pthread_t signalHandlingThread; #endif - // Handle all signals in the current thread. - // Called from Capability.c whenever the main capability is granted to a thread - // and in installDefaultHandlers +// Handle all signals in the current thread. +// Called from Capability.c whenever the main capability is granted to a thread +// and in installDefaultHandlers void -handleSignalsInThisThread() +handleSignalsInThisThread(void) { #ifdef RTS_SUPPORTS_THREADS signalHandlingThread = pthread_self(); #endif } - /* ----------------------------------------------------------------------------- * Allocate/resize the table of signal handlers. * -------------------------------------------------------------------------- */ diff --git a/ghc/rts/Signals.h b/ghc/rts/Signals.h index 4825fb7b9b..09ecec0814 100644 --- a/ghc/rts/Signals.h +++ b/ghc/rts/Signals.h @@ -28,6 +28,7 @@ extern void markSignalHandlers (evac_fn evac); extern void initDefaultHandlers(void); extern void handleSignalsInThisThread(void); +extern void handleSignalsInPrevThread(void); #elif defined(mingw32_TARGET_OS) #define RTS_USER_SIGNALS 1 |
