summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwolfgang <unknown>2003-10-01 10:49:09 +0000
committerwolfgang <unknown>2003-10-01 10:49:09 +0000
commit324e96d2ebfcb113cd97c43ef043d591ef87de71 (patch)
treeb5b38d8ec16bc56010132ef73710577e85122952
parentaefc6956f4828708e1343cf4858296fc3141a176 (diff)
downloadhaskell-324e96d2ebfcb113cd97c43ef043d591ef87de71.tar.gz
[project @ 2003-10-01 10:49:07 by wolfgang]
Threaded RTS: Don't start new worker threads earlier than necessary. After this commit, a Haskell program that uses neither forkOS nor forkIO is really single-threaded (rather than using two OS threads internally). Some details: Worker threads are now only created when a capability is released, and only when (there are no worker threads) && (there are runnable Haskell threads || there are Haskell threads blocked on IO or threadDelay) awaitEvent can now be called from bound thread scheduling loops (so that we don't have to create a worker thread just to run awaitEvent)
-rw-r--r--ghc/rts/Capability.c37
-rw-r--r--ghc/rts/Capability.h1
-rw-r--r--ghc/rts/RtsAPI.c8
-rw-r--r--ghc/rts/Schedule.c53
-rw-r--r--ghc/rts/Schedule.h11
-rw-r--r--ghc/rts/Select.c18
-rw-r--r--ghc/rts/Task.c13
-rw-r--r--ghc/rts/Task.h1
8 files changed, 110 insertions, 32 deletions
diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c
index 74d50acea2..d748aee7e6 100644
--- a/ghc/rts/Capability.c
+++ b/ghc/rts/Capability.c
@@ -199,6 +199,8 @@ void releaseCapability(Capability* cap
#endif
/* Signal that a capability is available */
signalCondition(&thread_ready_cond);
+ startSchedulerTaskIfNecessary(); // if there is more work to be done,
+ // we'll need a new thread
}
#endif
#ifdef RTS_SUPPORTS_THREADS
@@ -324,6 +326,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
*/
static Condition *passTarget = NULL;
+static rtsBool passingCapability = rtsFalse;
void
waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
@@ -334,8 +337,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
osThreadId(),pThreadCond));
- while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
- || (!pThreadCond && passTarget)) {
+ while ( noCapabilities() || (passingCapability && passTarget != pThreadCond)) {
if(pThreadCond)
{
waitCondition(pThreadCond, pMutex);
@@ -353,7 +355,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
osThreadId()));
}
}
- passTarget = NULL;
+ passingCapability = rtsFalse;
grabCapability(pCap);
return;
}
@@ -378,11 +380,40 @@ passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
rts_n_free_capabilities = 1;
signalCondition(pTargetThreadCond);
passTarget = pTargetThreadCond;
+ passingCapability = rtsTrue;
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): passCapability\n",
osThreadId()));
}
+/*
+ * Function: passCapabilityToWorker(Mutex*, Capability*)
+ *
+ * Purpose: Let go of the capability and make sure that a
+ * "plain" worker thread (not a bound thread) gets it next.
+ *
+ * Pre-condition: pMutex is held and cap is held by the current thread
+ * Post-condition: pMutex is held; cap will be grabbed by the "target"
+ * thread when pMutex is released.
+ */
+
+void
+passCapabilityToWorker(Mutex* pMutex, Capability* cap)
+{
+#ifdef SMP
+ #error SMP version not implemented
+#endif
+ rts_n_free_capabilities = 1;
+ signalCondition(&thread_ready_cond);
+ startSchedulerTaskIfNecessary();
+ passTarget = NULL;
+ passingCapability = rtsTrue;
+ IF_DEBUG(scheduler,
+ fprintf(stderr,"worker thread (%p): passCapabilityToWorker\n",
+ osThreadId()));
+}
+
+
#endif /* RTS_SUPPORTS_THREADS */
diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h
index 70acc15d47..ede787b562 100644
--- a/ghc/rts/Capability.h
+++ b/ghc/rts/Capability.h
@@ -42,6 +42,7 @@ extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond);
+extern void passCapabilityToWorker(Mutex* pMutex, Capability* cap);
static inline rtsBool needToYieldToReturningWorker(void)
{
diff --git a/ghc/rts/RtsAPI.c b/ghc/rts/RtsAPI.c
index 835db722d7..8d1cfd9986 100644
--- a/ghc/rts/RtsAPI.c
+++ b/ghc/rts/RtsAPI.c
@@ -1,5 +1,5 @@
/* ----------------------------------------------------------------------------
- * $Id: RtsAPI.c,v 1.48 2003/10/01 10:36:49 wolfgang Exp $
+ * $Id: RtsAPI.c,v 1.49 2003/10/01 10:49:07 wolfgang Exp $
*
* (c) The GHC Team, 1998-2001
*
@@ -501,12 +501,6 @@ rts_lock()
// b) wake the current worker thread from awaitEvent()
// (so that a thread started by rts_eval* will start immediately)
grabReturnCapability(&sched_mutex,&rtsApiCapability);
-
- // In the RTS hasn't been entered yet,
- // start a RTS task.
- // If there is already a task available (waiting for the work capability),
- // this will do nothing.
- startSchedulerTask();
#endif
}
diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c
index 2754c4f96e..33db7e685f 100644
--- a/ghc/rts/Schedule.c
+++ b/ghc/rts/Schedule.c
@@ -1,5 +1,5 @@
/* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.175 2003/09/26 13:32:14 panne Exp $
+ * $Id: Schedule.c,v 1.176 2003/10/01 10:49:08 wolfgang Exp $
*
* (c) The GHC Team, 1998-2000
*
@@ -313,20 +313,38 @@ StgTSO * activateSpark (rtsSpark spark);
StgTSO *MainTSO;
*/
-#if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_SUPPORTS_THREADS)
+static rtsBool startingWorkerThread = rtsFalse;
+
static void taskStart(void);
static void
taskStart(void)
{
- schedule(NULL,NULL);
+ Capability *cap;
+
+ ACQUIRE_LOCK(&sched_mutex);
+ startingWorkerThread = rtsFalse;
+ waitForWorkCapability(&sched_mutex, &cap, NULL);
+ RELEASE_LOCK(&sched_mutex);
+
+ schedule(NULL,cap);
}
-#endif
-#if defined(RTS_SUPPORTS_THREADS)
void
-startSchedulerTask(void)
+startSchedulerTaskIfNecessary(void)
{
- startTask(taskStart);
+ if(run_queue_hd != END_TSO_QUEUE
+ || blocked_queue_hd != END_TSO_QUEUE
+ || sleeping_queue != END_TSO_QUEUE)
+ {
+ if(!startingWorkerThread)
+ { // we don't want to start another worker thread
+ // just because the last one hasn't yet reached the
+ // "waiting for capability" state
+ startingWorkerThread = rtsTrue;
+ startTask(taskStart);
+ }
+ }
}
#endif
@@ -475,7 +493,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
// so just exit right away.
prog_belch("interrupted");
releaseCapability(cap);
- startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
RELEASE_LOCK(&sched_mutex);
shutdownHaskellAndExit(EXIT_SUCCESS);
#else
@@ -1151,7 +1168,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
PUSH_ON_RUN_QUEUE(t);
- releaseCapability(cap);
+ passCapabilityToWorker(&sched_mutex, cap);
cap = NULL;
continue;
}
@@ -1830,9 +1847,6 @@ suspendThread( StgRegTable *reg,
waiting to take over.
*/
IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
- //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
- startTask(taskStart);
- //}
#endif
/* Other threads _might_ be available for execution; signal this */
@@ -2245,9 +2259,10 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCap
m->ret = ret;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
- initCondition(&m->wakeup);
#if defined(THREADED_RTS)
initCondition(&m->bound_thread_cond);
+#else
+ initCondition(&m->wakeup);
#endif
#endif
@@ -2459,9 +2474,10 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
m->ret = ret;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
- initCondition(&m->wakeup);
#if defined(THREADED_RTS)
initCondition(&m->bound_thread_cond);
+#else
+ initCondition(&m->wakeup);
#endif
#endif
@@ -2512,9 +2528,10 @@ waitThread_(StgMainThread* m, Capability *initialCapability)
stat = m->stat;
#if defined(RTS_SUPPORTS_THREADS)
- closeCondition(&m->wakeup);
#if defined(THREADED_RTS)
closeCondition(&m->bound_thread_cond);
+#else
+ closeCondition(&m->wakeup);
#endif
#endif
@@ -3498,7 +3515,11 @@ deleteThreadImmediately(StgTSO *tso)
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- unblockThread(tso);
+#if defined(RTS_SUPPORTS_THREADS)
+ if (tso->why_blocked != BlockedOnCCall
+ && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
+#endif
+ unblockThread(tso);
tso->what_next = ThreadKilled;
}
#endif
diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h
index fccac3ce0c..ebbe7d15ff 100644
--- a/ghc/rts/Schedule.h
+++ b/ghc/rts/Schedule.h
@@ -1,5 +1,5 @@
/* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.39 2003/09/21 22:20:56 wolfgang Exp $
+ * $Id: Schedule.h,v 1.40 2003/10/01 10:49:09 wolfgang Exp $
*
* (c) The GHC Team 1998-1999
*
@@ -190,9 +190,10 @@ typedef struct StgMainThread_ {
SchedulerStatus stat;
StgClosure ** ret;
#if defined(RTS_SUPPORTS_THREADS)
- Condition wakeup;
#if defined(THREADED_RTS)
Condition bound_thread_cond;
+#else
+ Condition wakeup;
#endif
#endif
struct StgMainThread_ *link;
@@ -297,12 +298,12 @@ void labelThread(StgPtr tso, char *label);
#if defined(RTS_SUPPORTS_THREADS)
/* If no task is waiting for a capability,
+ * and if there is work to be done
+ * or if we need to wait for IO or delay requests,
* spawn a new worker thread.
- *
- * (Used by the RtsAPI)
*/
void
-startSchedulerTask(void);
+startSchedulerTaskIfNecessary(void);
#endif
#endif /* __SCHEDULE_H__ */
diff --git a/ghc/rts/Select.c b/ghc/rts/Select.c
index 677fdd2c81..d7e6ffcdaa 100644
--- a/ghc/rts/Select.c
+++ b/ghc/rts/Select.c
@@ -1,5 +1,5 @@
/* -----------------------------------------------------------------------------
- * $Id: Select.c,v 1.29 2003/06/26 12:22:59 stolz Exp $
+ * $Id: Select.c,v 1.30 2003/10/01 10:49:09 wolfgang Exp $
*
* (c) The GHC Team 1995-2002
*
@@ -351,4 +351,20 @@ wakeBlockedWorkerThread()
workerWakeupPending = rtsTrue;
}
}
+
+/* resetWorkerWakeupPipeAfterFork
+ *
+ * To be called right after a fork().
+ * After the fork(), the worker wakeup pipe will be shared
+ * with the parent process, and that's something we don't want.
+ */
+void
+resetWorkerWakeupPipeAfterFork()
+{
+ if(workerWakeupInited) {
+ close(workerWakeupPipe[0]);
+ close(workerWakeupPipe[1]);
+ }
+ workerWakeupInited = rtsFalse;
+}
#endif
diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c
index 92b5c2594f..c720538995 100644
--- a/ghc/rts/Task.c
+++ b/ghc/rts/Task.c
@@ -134,6 +134,12 @@ stopTaskManager ()
return;
}
+void
+resetTaskManagerAfterFork ()
+{
+ barf("resetTaskManagerAfterFork not implemented for SMP");
+}
+
#else
/************ THREADS version *****************/
@@ -192,6 +198,13 @@ stopTaskManager ()
{
}
+
+void
+resetTaskManagerAfterFork ()
+{
+ rts_n_waiting_tasks = 0;
+ taskCount = 0;
+}
#endif
diff --git a/ghc/rts/Task.h b/ghc/rts/Task.h
index bf29d9194a..ee599876fd 100644
--- a/ghc/rts/Task.h
+++ b/ghc/rts/Task.h
@@ -28,6 +28,7 @@ extern TaskInfo *taskIds;
extern void startTaskManager ( nat maxTasks, void (*taskStart)(void) );
extern void stopTaskManager ( void );
+void resetTaskManagerAfterFork ();
extern void startTask ( void (*taskStart)(void) );