summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ghc/rts/Capability.c53
-rw-r--r--ghc/rts/Capability.h4
-rw-r--r--ghc/rts/PrimOps.cmm20
-rw-r--r--ghc/rts/Schedule.c107
-rw-r--r--ghc/rts/Schedule.h12
-rw-r--r--ghc/rts/Select.c103
-rw-r--r--ghc/rts/Signals.c13
-rw-r--r--ghc/rts/Signals.h1
8 files changed, 90 insertions, 223 deletions
diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c
index 3ea96fe770..62f205d4d4 100644
--- a/ghc/rts/Capability.c
+++ b/ghc/rts/Capability.c
@@ -56,8 +56,8 @@ nat rts_n_waiting_workers = 0;
* exclusive access to the RTS and all its data structures (that are not
* locked by the Scheduler's mutex).
*
- * thread_ready_cond is signalled whenever noCapabilities doesn't hold.
- *
+ * thread_ready_cond is signalled whenever
+ * !noCapabilities && !EMPTY_RUN_QUEUE().
*/
Condition thread_ready_cond = INIT_COND_VAR;
@@ -82,6 +82,12 @@ static rtsBool passingCapability = rtsFalse;
#define UNUSED_IF_NOT_SMP STG_UNUSED
#endif
+#if defined(RTS_USER_SIGNALS)
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || signals_pending())
+#else
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted)
+#endif
+
/* ----------------------------------------------------------------------------
Initialisation
------------------------------------------------------------------------- */
@@ -211,7 +217,7 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
rts_n_free_capabilities = 1;
#endif
// Signal that a capability is available
- if (rts_n_waiting_tasks > 0) {
+ if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) {
signalCondition(&thread_ready_cond);
}
startSchedulerTaskIfNecessary();
@@ -263,7 +269,6 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
if ( noCapabilities() || passingCapability ) {
rts_n_waiting_workers++;
- wakeBlockedWorkerThread();
context_switch = 1; // make sure it's our turn soon
waitCondition(&returning_worker_cond, pMutex);
#if defined(SMP)
@@ -294,8 +299,16 @@ yieldCapability( Capability** pCap )
// Pre-condition: pMutex is assumed held, the current thread
// holds the capability pointed to by pCap.
- if ( rts_n_waiting_workers > 0 || passingCapability ) {
- IF_DEBUG(scheduler, sched_belch("worker: giving up capability"));
+ if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) {
+ IF_DEBUG(scheduler,
+ if (rts_n_waiting_workers > 0) {
+ sched_belch("worker: giving up capability (returning wkr)");
+ } else if (passingCapability) {
+ sched_belch("worker: giving up capability (passing capability)");
+ } else {
+ sched_belch("worker: giving up capability (no threads to run)");
+ }
+ );
releaseCapability(*pCap);
*pCap = NULL;
}
@@ -324,13 +337,14 @@ yieldCapability( Capability** pCap )
* passed to this thread using passCapability.
* ------------------------------------------------------------------------- */
-void
+void
waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
{
// Pre-condition: pMutex is held.
- while ( noCapabilities() ||
- (passingCapability && passTarget != pThreadCond)) {
+ while ( noCapabilities() ||
+ (passingCapability && passTarget != pThreadCond) ||
+ !ANY_WORK_TO_DO()) {
IF_DEBUG(scheduler,
sched_belch("worker: wait for capability (cond: %p)",
pThreadCond));
@@ -384,6 +398,27 @@ passCapabilityToWorker( void )
#endif /* RTS_SUPPORTS_THREADS */
+/* ----------------------------------------------------------------------------
+ threadRunnable()
+
+ Signals that a thread has been placed on the run queue, so a worker
+ might need to be woken up to run it.
+
+ ToDo: should check whether the thread at the front of the queue is
+ bound, and if so wake up the appropriate worker.
+ -------------------------------------------------------------------------- */
+
+void
+threadRunnable ( void )
+{
+#if defined(RTS_SUPPORTS_THREADS)
+ if ( !noCapabilities && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) {
+ signalCondition(&thread_ready_cond);
+ }
+ startSchedulerTaskIfNecessary();
+#endif
+}
+
/* ------------------------------------------------------------------------- */
#if defined(SMP)
diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h
index 450bf74275..e615035187 100644
--- a/ghc/rts/Capability.h
+++ b/ghc/rts/Capability.h
@@ -31,6 +31,10 @@ extern void initCapabilities( void );
//
extern void releaseCapability( Capability* cap );
+// Signal that a thread has become runnable
+//
+extern void threadRunnable ( void );
+
#ifdef RTS_SUPPORTS_THREADS
// Gives up the current capability IFF there is a higher-priority
// thread waiting for it. This happens in one of two ways:
diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm
index c9556f4d33..91c1325872 100644
--- a/ghc/rts/PrimOps.cmm
+++ b/ghc/rts/PrimOps.cmm
@@ -1342,6 +1342,10 @@ mkApUpd0zh_fast
waitReadzh_fast
{
/* args: R1 */
+#ifdef THREADED_RTS
+ foreign "C" barf("waitRead# on threaded RTS");
+#endif
+
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
StgTSO_block_info(CurrentTSO) = R1;
@@ -1354,6 +1358,10 @@ waitReadzh_fast
waitWritezh_fast
{
/* args: R1 */
+#ifdef THREADED_RTS
+ foreign "C" barf("waitWrite# on threaded RTS");
+#endif
+
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
StgTSO_block_info(CurrentTSO) = R1;
@@ -1374,6 +1382,10 @@ delayzh_fast
W_ t, prev, target;
#endif
+#ifdef THREADED_RTS
+ foreign "C" barf("delay# on threaded RTS");
+#endif
+
/* args: R1 (microsecond delay amount) */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
@@ -1432,6 +1444,10 @@ asyncReadzh_fast
W_ ares;
CInt reqID;
+#ifdef THREADED_RTS
+ foreign "C" barf("asyncRead# on threaded RTS");
+#endif
+
/* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
@@ -1454,6 +1470,10 @@ asyncWritezh_fast
W_ ares;
CInt reqID;
+#ifdef THREADED_RTS
+ foreign "C" barf("asyncWrite# on threaded RTS");
+#endif
+
/* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c
index 04e9f1f43d..04e70dae73 100644
--- a/ghc/rts/Schedule.c
+++ b/ghc/rts/Schedule.c
@@ -453,13 +453,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
// run queue is empty, and there are no other tasks running, we
// can wait indefinitely for something to happen.
//
- if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
+ if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
+ {
#if defined(RTS_SUPPORTS_THREADS)
- || EMPTY_RUN_QUEUE()
+ // We shouldn't be here...
+ barf("schedule: awaitEvent() in threaded RTS");
#endif
- )
- {
- awaitEvent( EMPTY_RUN_QUEUE() );
+ awaitEvent( EMPTY_RUN_QUEUE() );
}
// we can be interrupted while waiting for I/O...
if (interrupted) continue;
@@ -479,18 +479,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
if ( EMPTY_THREAD_QUEUES() )
{
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
+
// Garbage collection can release some new threads due to
// either (a) finalizers or (b) threads resurrected because
- // they are about to be send BlockedOnDeadMVar. Any threads
- // thus released will be immediately runnable.
+ // they are unreachable and will therefore be sent an
+ // exception. Any threads thus released will be immediately
+ // runnable.
GarbageCollect(GetRoots,rtsTrue);
-
- if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
-
- IF_DEBUG(scheduler,
- sched_belch("still deadlocked, checking for black holes..."));
- detectBlackHoles();
-
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
#if defined(RTS_USER_SIGNALS)
@@ -1457,12 +1452,6 @@ forkProcess(HsStablePtr *entry
stgFree(m);
}
-# ifdef RTS_SUPPORTS_THREADS
- resetTaskManagerAfterFork(); // tell startTask() and friends that
- startingWorkerThread = rtsFalse; // we have no worker threads any more
- resetWorkerWakeupPipeAfterFork();
-# endif
-
rc = rts_evalStableIO(entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",rc);
@@ -1568,8 +1557,6 @@ suspendThread( StgRegTable *reg )
IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
- /* Other threads _might_ be available for execution; signal this */
- THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
@@ -1933,11 +1920,10 @@ static void scheduleThread_ (StgTSO* tso);
void
scheduleThread_(StgTSO *tso)
{
- // Precondition: sched_mutex must be held.
// The thread goes at the *end* of the run-queue, to avoid possible
// starvation of any threads already on the queue.
APPEND_TO_RUN_QUEUE(tso);
- THREAD_RUNNABLE();
+ threadRunnable();
}
void
@@ -1997,7 +1983,7 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
APPEND_TO_RUN_QUEUE(tso);
- // NB. Don't call THREAD_RUNNABLE() here, because the thread is
+ // NB. Don't call threadRunnable() here, because the thread is
// bound and only runnable by *this* OS thread, so waking up other
// workers will just slow things down.
@@ -2428,7 +2414,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
next = bqe->link;
((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
- THREAD_RUNNABLE();
+ threadRunnable();
unblockCount(bqe, node);
/* reset blocking status after dumping event */
((StgTSO *)bqe)->why_blocked = NotBlocked;
@@ -2473,7 +2459,7 @@ unblockOneLocked(StgTSO *tso)
next = tso->link;
tso->link = END_TSO_QUEUE;
APPEND_TO_RUN_QUEUE(tso);
- THREAD_RUNNABLE();
+ threadRunnable();
IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
return next;
}
@@ -2644,9 +2630,6 @@ interruptStgRts(void)
{
interrupted = 1;
context_switch = 1;
-#ifdef RTS_SUPPORTS_THREADS
- wakeBlockedWorkerThread();
-#endif
}
/* -----------------------------------------------------------------------------
@@ -3277,70 +3260,6 @@ resurrectThreads( StgTSO *threads )
}
}
-/* -----------------------------------------------------------------------------
- * Blackhole detection: if we reach a deadlock, test whether any
- * threads are blocked on themselves. Any threads which are found to
- * be self-blocked get sent a NonTermination exception.
- *
- * This is only done in a deadlock situation in order to avoid
- * performance overhead in the normal case.
- *
- * Locks: sched_mutex is held upon entry and exit.
- * -------------------------------------------------------------------------- */
-
-#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
-static void
-detectBlackHoles( void )
-{
- StgTSO *tso = all_threads;
- StgPtr frame;
- StgClosure *blocked_on;
- StgRetInfoTable *info;
-
- for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
-
- while (tso->what_next == ThreadRelocated) {
- tso = tso->link;
- ASSERT(get_itbl(tso)->type == TSO);
- }
-
- if (tso->why_blocked != BlockedOnBlackHole) {
- continue;
- }
- blocked_on = tso->block_info.closure;
-
- frame = tso->sp;
-
- while(1) {
- info = get_ret_itbl((StgClosure *)frame);
- switch (info->i.type) {
- case UPDATE_FRAME:
- if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
- /* We are blocking on one of our own computations, so
- * send this thread the NonTermination exception.
- */
- IF_DEBUG(scheduler,
- sched_belch("thread %d is blocked on itself", tso->id));
- raiseAsync(tso, (StgClosure *)NonTermination_closure);
- goto done;
- }
-
- frame = (StgPtr)((StgUpdateFrame *)frame + 1);
- continue;
-
- case STOP_FRAME:
- goto done;
-
- // normal stack frames; do nothing except advance the pointer
- default:
- frame += stack_frame_sizeW((StgClosure *)frame);
- }
- }
- done: ;
- }
-}
-#endif
-
/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* [Also provides useful information when debugging threaded programs
diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h
index 8924a62dac..59d900f47e 100644
--- a/ghc/rts/Schedule.h
+++ b/ghc/rts/Schedule.h
@@ -156,7 +156,6 @@ extern nat RTS_VAR(rts_n_waiting_workers);
extern nat RTS_VAR(rts_n_waiting_tasks);
#endif
-StgBool rtsSupportsBoundThreads(void);
StgBool isThreadBound(StgTSO *tso);
extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
@@ -280,17 +279,6 @@ void labelThread(StgPtr tso, char *label);
} \
blocked_queue_tl = tso;
-/* Signal that a runnable thread has become available, in
- * case there are any waiting tasks to execute it.
- */
-#if defined(RTS_SUPPORTS_THREADS)
-#define THREAD_RUNNABLE() \
- wakeBlockedWorkerThread(); \
- context_switch = 1;
-#else
-#define THREAD_RUNNABLE() /* nothing */
-#endif
-
/* Check whether various thread queues are empty
*/
#define EMPTY_QUEUE(q) (q == END_TSO_QUEUE)
diff --git a/ghc/rts/Select.c b/ghc/rts/Select.c
index 26870641bc..418e48cb7a 100644
--- a/ghc/rts/Select.c
+++ b/ghc/rts/Select.c
@@ -37,13 +37,6 @@
/* last timestamp */
nat timestamp = 0;
-#ifdef RTS_SUPPORTS_THREADS
-static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
-static rtsBool workerWakeupPending = rtsFalse;
-static int workerWakeupPipe[2];
-static rtsBool workerWakeupInited = rtsFalse;
-#endif
-
/* There's a clever trick here to avoid problems when the time wraps
* around. Since our maximum delay is smaller than 31 bits of ticks
* (it's actually 31 bits of microseconds), we can safely check
@@ -163,34 +156,6 @@ awaitEvent(rtsBool wait)
}
}
-#ifdef RTS_SUPPORTS_THREADS
- if(!workerWakeupInited) {
- pipe(workerWakeupPipe);
- workerWakeupInited = rtsTrue;
- }
- FD_SET(workerWakeupPipe[0], &rfd);
- maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
-#endif
-
- /* Release the scheduler lock while we do the poll.
- * this means that someone might muck with the blocked_queue
- * while we do this, but it shouldn't matter:
- *
- * - another task might poll for I/O and remove one
- * or more threads from the blocked_queue.
- * - more I/O threads may be added to blocked_queue.
- * - more delayed threads may be added to blocked_queue. We'll
- * just subtract delta from their delays after the poll.
- *
- * I believe none of these cases lead to trouble --SDM.
- */
-
-#ifdef RTS_SUPPORTS_THREADS
- isWorkerBlockedInAwaitEvent = rtsTrue;
- workerWakeupPending = rtsFalse;
-#endif
- RELEASE_LOCK(&sched_mutex);
-
/* Check for any interesting events */
tv.tv_sec = min / 1000000;
@@ -223,10 +188,6 @@ awaitEvent(rtsBool wait)
barf("select failed");
}
}
- ACQUIRE_LOCK(&sched_mutex);
-#ifdef RTS_SUPPORTS_THREADS
- isWorkerBlockedInAwaitEvent = rtsFalse;
-#endif
/* We got a signal; could be one of ours. If so, we need
* to start up the signal handler straight away, otherwise
@@ -235,9 +196,7 @@ awaitEvent(rtsBool wait)
*/
#if defined(RTS_USER_SIGNALS)
if (signals_pending()) {
- RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
- ACQUIRE_LOCK(&sched_mutex);
return; /* still hold the lock */
}
#endif
@@ -258,24 +217,8 @@ awaitEvent(rtsBool wait)
if (run_queue_hd != END_TSO_QUEUE) {
return; /* still hold the lock */
}
-
-#ifdef RTS_SUPPORTS_THREADS
- /* If another worker thread wants to take over,
- * return to the scheduler
- */
- if (needToYieldToReturningWorker()) {
- return; /* still hold the lock */
- }
-#endif
-
-#ifdef RTS_SUPPORTS_THREADS
- isWorkerBlockedInAwaitEvent = rtsTrue;
-#endif
- RELEASE_LOCK(&sched_mutex);
}
- ACQUIRE_LOCK(&sched_mutex);
-
/* Step through the waiting queue, unblocking every thread that now has
* a file descriptor in a ready state.
*/
@@ -317,51 +260,5 @@ awaitEvent(rtsBool wait)
}
}
-#if defined(RTS_SUPPORTS_THREADS)
- // if we were woken up by wakeBlockedWorkerThread,
- // read the dummy byte from the pipe
- if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
- unsigned char dummy;
- wait = rtsFalse;
- read(workerWakeupPipe[0],&dummy,1);
- }
-#endif
} while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
}
-
-
-#ifdef RTS_SUPPORTS_THREADS
-/* wakeBlockedWorkerThread
- *
- * If a worker thread is currently blocked within awaitEvent,
- * wake it.
- * Must be called with sched_mutex held.
- */
-void
-wakeBlockedWorkerThread()
-{
- if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
- unsigned char dummy = 42; // Any value will do here
-
- // write something so that select() wakes up
- write(workerWakeupPipe[1],&dummy,1);
- workerWakeupPending = rtsTrue;
- }
-}
-
-/* resetWorkerWakeupPipeAfterFork
- *
- * To be called right after a fork().
- * After the fork(), the worker wakeup pipe will be shared
- * with the parent process, and that's something we don't want.
- */
-void
-resetWorkerWakeupPipeAfterFork()
-{
- if(workerWakeupInited) {
- close(workerWakeupPipe[0]);
- close(workerWakeupPipe[1]);
- }
- workerWakeupInited = rtsFalse;
-}
-#endif
diff --git a/ghc/rts/Signals.c b/ghc/rts/Signals.c
index d5a046e01b..ac6d26674d 100644
--- a/ghc/rts/Signals.c
+++ b/ghc/rts/Signals.c
@@ -54,22 +54,25 @@ static nat n_haskell_handlers = 0;
StgPtr pending_handler_buf[N_PENDING_HANDLERS];
StgPtr *next_pending_handler = pending_handler_buf;
+/* -----------------------------------------------------------------------------
+ * Signal handling
+ * -------------------------------------------------------------------------- */
+
#ifdef RTS_SUPPORTS_THREADS
pthread_t signalHandlingThread;
#endif
- // Handle all signals in the current thread.
- // Called from Capability.c whenever the main capability is granted to a thread
- // and in installDefaultHandlers
+// Handle all signals in the current thread.
+// Called from Capability.c whenever the main capability is granted to a thread
+// and in installDefaultHandlers
void
-handleSignalsInThisThread()
+handleSignalsInThisThread(void)
{
#ifdef RTS_SUPPORTS_THREADS
signalHandlingThread = pthread_self();
#endif
}
-
/* -----------------------------------------------------------------------------
* Allocate/resize the table of signal handlers.
* -------------------------------------------------------------------------- */
diff --git a/ghc/rts/Signals.h b/ghc/rts/Signals.h
index 4825fb7b9b..09ecec0814 100644
--- a/ghc/rts/Signals.h
+++ b/ghc/rts/Signals.h
@@ -28,6 +28,7 @@ extern void markSignalHandlers (evac_fn evac);
extern void initDefaultHandlers(void);
extern void handleSignalsInThisThread(void);
+extern void handleSignalsInPrevThread(void);
#elif defined(mingw32_TARGET_OS)
#define RTS_USER_SIGNALS 1